Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CI #1012

Merged
merged 12 commits into from
Nov 21, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Testing:
* Improve `test_full_workflow.py` (\#971).
* Update `pytest-asyncio` to v0.21 (\#1008).
* Fix CI issue related to event loop and asyncpg (\#1012).

# 1.3.14 (do not use!)

Expand Down
8 changes: 1 addition & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@ def check_basetemp(tpath: Path):
)


def pytest_configure(config):
"""
See https://docs.pytest.org/en/stable/how-to/mark.html#registering-marks
"""
config.addinivalue_line("markers", "slow: marks tests as slow")


@pytest.fixture(scope="session")
def event_loop():
_event_loop = asyncio.new_event_loop()
_event_loop.set_debug(True)
yield _event_loop
_event_loop.close()


@pytest.fixture(scope="session")
Expand Down
61 changes: 20 additions & 41 deletions tests/test_backend_slurm_shutdown.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import logging
import threading
import shlex
import subprocess
import time
from pathlib import Path

import pytest
from devtools import debug
Expand Down Expand Up @@ -114,33 +112,6 @@ def _sleep_and_return(sleep_time):
raise e


async def _write_shutdown_file(shutdown_file: Path, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[_write_shutdown_file] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)

debug(run_squeue())
# Scancel all jobs of the current SLURM user
logging.warning(f"[_write_shutdown_file] run WRITE {time.perf_counter()=}")
# Trigger shutdown
with shutdown_file.open("w") as f:
f.write("")
assert shutdown_file.exists()
logging.warning(f"[_write_shutdown_file] run END {time.perf_counter()=}")


def _auxiliary_run(shutdown_file: Path, sleep_time):
# The _write_shutdown_file and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_write_shutdown_file(shutdown_file, sleep_time))
loop.close()


def test_indirect_shutdown_during_map(
monkey_slurm,
monkey_slurm_user,
Expand All @@ -154,18 +125,23 @@ def fun_sleep(dummy):

shutdown_file = tmp_path / "shutdown"

# NOTE: the executor.map call below is blocking. For this reason, we call
# the scancel function from a different thread, so that we can make it
# happen during the workflow execution The following block is based on
# https://stackoverflow.com/a/59645689/19085332

# NOTE: the executor.map call below is blocking. For this reason, we write
# the shutdown file from a subprocess.Popen, so that we can make it happen
# during the execution.
shutdown_sleep_time = 2
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(shutdown_file, shutdown_sleep_time)
tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {shutdown_sleep_time}\n")
f.write(f"cat NOTHING > {shutdown_file.as_posix()}\n")

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

with FractalSlurmExecutor(
slurm_user=monkey_slurm_user,
Expand All @@ -186,3 +162,6 @@ def fun_sleep(dummy):
list(res)
debug(e.value)
assert "shutdown" in str(e.value)

tmp_stdout.close()
tmp_stderr.close()
73 changes: 28 additions & 45 deletions tests/test_full_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""
import asyncio
import glob
import logging
import os
import threading
import time
import shlex
import subprocess
from pathlib import Path

import pytest
from devtools import debug

from .fixtures_slurm import scancel_all_jobs_of_a_slurm_user
from fractal_server.app.runner import _backends

PREFIX = "/api/v1"
Expand All @@ -33,7 +30,6 @@
backends_available = list(_backends.keys())


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
async def test_full_workflow(
db,
Expand All @@ -48,7 +44,6 @@ async def test_full_workflow(
request,
override_settings_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / f"artifacts-{backend}",
Expand Down Expand Up @@ -231,7 +226,6 @@ async def test_full_workflow(
assert "index" in list(output_dataset_json["meta"].keys())


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
async def test_failing_workflow_UnknownError(
client,
Expand Down Expand Up @@ -332,7 +326,6 @@ async def test_failing_workflow_UnknownError(
print(job_status_data["log"])


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
@pytest.mark.parametrize("failing_task", ["parallel", "non_parallel"])
async def test_failing_workflow_TaskExecutionError(
Expand All @@ -350,7 +343,6 @@ async def test_failing_workflow_TaskExecutionError(
override_settings_factory,
resource_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -500,30 +492,7 @@ async def test_failing_workflow_TaskExecutionError(
assert "index" in list(output_dataset_json["meta"].keys())


async def _auxiliary_scancel(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[scancel_thread] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)
# Scancel all jobs of the current SLURM user
logging.warning(f"[scancel_thread] run SCANCEL {time.perf_counter()=}")
scancel_all_jobs_of_a_slurm_user(slurm_user=slurm_user, show_squeue=True)
logging.warning(f"[scancel_thread] run END {time.perf_counter()=}")


def _auxiliary_run(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_auxiliary_scancel(slurm_user, sleep_time))
loop.close()


@pytest.mark.parametrize("backend", ["slurm"])
@pytest.mark.slow
async def test_failing_workflow_JobExecutionError(
backend,
client,
Expand All @@ -541,8 +510,8 @@ async def test_failing_workflow_JobExecutionError(
relink_python_interpreter,
cfut_jobs_finished,
resource_factory,
tmp_path,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -603,19 +572,31 @@ async def test_failing_workflow_JobExecutionError(
debug(wftask1_id)

# NOTE: the client.post call below is blocking, due to the way we are
# running tests. For this reason, we call the scancel functionfrom a
# different thread, so that we can make it happen during the workflow
# execution
# The following block is based on
# https://stackoverflow.com/a/59645689/19085332
# running tests. For this reason, we call the scancel function from a
# from a subprocess.Popen, so that we can make it happen during the
# execution.
scancel_sleep_time = 10
slurm_user = monkey_slurm_user
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(slurm_user, scancel_sleep_time)

tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {scancel_sleep_time}\n")
f.write(
(
f"sudo --non-interactive -u {slurm_user} "
f"scancel -u {slurm_user} -v"
"\n"
)
)

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

# Re-submit the modified workflow
res_second_apply = await client.post(
Expand Down Expand Up @@ -656,6 +637,9 @@ async def test_failing_workflow_JobExecutionError(
str(wftask1_id): "failed",
}

tmp_stdout.close()
tmp_stderr.close()


async def test_non_python_task(
client,
Expand Down Expand Up @@ -925,7 +909,6 @@ async def test_non_executable_task_command(
user_kwargs = {}

async with MockCurrentUser(persist=True, user_kwargs=user_kwargs) as user:

# Create task
task = await task_factory(
name="invalid-task-command",
Expand Down
Loading