Skip to content

Commit

Permalink
Avoid explicit use of threads and event loops in tests, in favor of P…
Browse files Browse the repository at this point in the history
…open (ref #954)
  • Loading branch information
tcompa committed Nov 21, 2023
1 parent 22ba341 commit 7e71d95
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 85 deletions.
62 changes: 20 additions & 42 deletions tests/test_backend_slurm_shutdown.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import logging
import threading
import shlex
import subprocess
import time
from pathlib import Path

import pytest
from devtools import debug
Expand Down Expand Up @@ -114,33 +112,6 @@ def _sleep_and_return(sleep_time):
raise e


async def _write_shutdown_file(shutdown_file: Path, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[_write_shutdown_file] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)

debug(run_squeue())
# Scancel all jobs of the current SLURM user
logging.warning(f"[_write_shutdown_file] run WRITE {time.perf_counter()=}")
# Trigger shutdown
with shutdown_file.open("w") as f:
f.write("")
assert shutdown_file.exists()
logging.warning(f"[_write_shutdown_file] run END {time.perf_counter()=}")


def _auxiliary_run(shutdown_file: Path, sleep_time):
# The _write_shutdown_file and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_write_shutdown_file(shutdown_file, sleep_time))
loop.close()


def test_indirect_shutdown_during_map(
monkey_slurm,
monkey_slurm_user,
Expand All @@ -154,18 +125,23 @@ def fun_sleep(dummy):

shutdown_file = tmp_path / "shutdown"

# NOTE: the executor.map call below is blocking. For this reason, we call
# the scancel function from a different thread, so that we can make it
# happen during the workflow execution The following block is based on
# https://stackoverflow.com/a/59645689/19085332

# NOTE: the executor.map call below is blocking. For this reason, we write
# the shutdown file from a subprocess.Popen, so that we can make it happen
# during the execution.
shutdown_sleep_time = 2
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(shutdown_file, shutdown_sleep_time)
tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {shutdown_sleep_time}\n")
f.write(f"cat NOTHING > {shutdown_file.as_posix()}\n")

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

with FractalSlurmExecutor(
slurm_user=monkey_slurm_user,
Expand All @@ -186,4 +162,6 @@ def fun_sleep(dummy):
list(res)
debug(e.value)
assert "shutdown" in str(e.value)
_thread.join()

tmp_stdout.close()
tmp_stderr.close()
71 changes: 28 additions & 43 deletions tests/test_full_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""
import asyncio
import glob
import logging
import os
import threading
import time
import shlex
import subprocess
from pathlib import Path

import pytest
from devtools import debug

from .fixtures_slurm import scancel_all_jobs_of_a_slurm_user
from fractal_server.app.runner import _backends

PREFIX = "/api/v1"
Expand All @@ -47,7 +44,6 @@ async def test_full_workflow(
request,
override_settings_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / f"artifacts-{backend}",
Expand Down Expand Up @@ -347,7 +343,6 @@ async def test_failing_workflow_TaskExecutionError(
override_settings_factory,
resource_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -497,29 +492,6 @@ async def test_failing_workflow_TaskExecutionError(
assert "index" in list(output_dataset_json["meta"].keys())


async def _auxiliary_scancel(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[scancel_thread] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)
# Scancel all jobs of the current SLURM user
logging.warning(f"[scancel_thread] run SCANCEL {time.perf_counter()=}")
scancel_all_jobs_of_a_slurm_user(slurm_user=slurm_user, show_squeue=True)
logging.warning(f"[scancel_thread] run END {time.perf_counter()=}")


def _auxiliary_run(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_auxiliary_scancel(slurm_user, sleep_time))
loop.close()


@pytest.mark.skip
@pytest.mark.parametrize("backend", ["slurm"])
async def test_failing_workflow_JobExecutionError(
backend,
Expand All @@ -538,8 +510,8 @@ async def test_failing_workflow_JobExecutionError(
relink_python_interpreter,
cfut_jobs_finished,
resource_factory,
tmp_path,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -600,19 +572,31 @@ async def test_failing_workflow_JobExecutionError(
debug(wftask1_id)

# NOTE: the client.post call below is blocking, due to the way we are
# running tests. For this reason, we call the scancel functionfrom a
# different thread, so that we can make it happen during the workflow
# execution
# The following block is based on
# https://stackoverflow.com/a/59645689/19085332
# running tests. For this reason, we call the scancel function from a
# from a subprocess.Popen, so that we can make it happen during the
# execution.
scancel_sleep_time = 10
slurm_user = monkey_slurm_user
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(slurm_user, scancel_sleep_time)

tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {scancel_sleep_time}\n")
f.write(
(
f"sudo --non-interactive -u {slurm_user} "
f"scancel -u {slurm_user} -v"
"\n"
)
)

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

# Re-submit the modified workflow
res_second_apply = await client.post(
Expand Down Expand Up @@ -652,7 +636,9 @@ async def test_failing_workflow_JobExecutionError(
str(wftask0_id): "done",
str(wftask1_id): "failed",
}
_thread.join()

tmp_stdout.close()
tmp_stderr.close()


async def test_non_python_task(
Expand Down Expand Up @@ -923,7 +909,6 @@ async def test_non_executable_task_command(
user_kwargs = {}

async with MockCurrentUser(persist=True, user_kwargs=user_kwargs) as user:

# Create task
task = await task_factory(
name="invalid-task-command",
Expand Down

0 comments on commit 7e71d95

Please sign in to comment.