Skip to content

Commit

Permalink
Merge pull request #1012 from fractal-analytics-platform/work_towards…
Browse files Browse the repository at this point in the history
…_fixing_ci

Fix CI
  • Loading branch information
tcompa authored Nov 21, 2023
2 parents 01a9a89 + 00fd71c commit 020516e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Testing:
* Improve `test_full_workflow.py` (\#971).
* Update `pytest-asyncio` to v0.21 (\#1008).
* Fix CI issue related to event loop and asyncpg (\#1012).

# 1.3.14 (do not use!)

Expand Down
8 changes: 1 addition & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@ def check_basetemp(tpath: Path):
)


def pytest_configure(config):
"""
See https://docs.pytest.org/en/stable/how-to/mark.html#registering-marks
"""
config.addinivalue_line("markers", "slow: marks tests as slow")


@pytest.fixture(scope="session")
def event_loop():
_event_loop = asyncio.new_event_loop()
_event_loop.set_debug(True)
yield _event_loop
_event_loop.close()


@pytest.fixture(scope="session")
Expand Down
61 changes: 20 additions & 41 deletions tests/test_backend_slurm_shutdown.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import logging
import threading
import shlex
import subprocess
import time
from pathlib import Path

import pytest
from devtools import debug
Expand Down Expand Up @@ -114,33 +112,6 @@ def _sleep_and_return(sleep_time):
raise e


async def _write_shutdown_file(shutdown_file: Path, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[_write_shutdown_file] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)

debug(run_squeue())
# Scancel all jobs of the current SLURM user
logging.warning(f"[_write_shutdown_file] run WRITE {time.perf_counter()=}")
# Trigger shutdown
with shutdown_file.open("w") as f:
f.write("")
assert shutdown_file.exists()
logging.warning(f"[_write_shutdown_file] run END {time.perf_counter()=}")


def _auxiliary_run(shutdown_file: Path, sleep_time):
# The _write_shutdown_file and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_write_shutdown_file(shutdown_file, sleep_time))
loop.close()


def test_indirect_shutdown_during_map(
monkey_slurm,
monkey_slurm_user,
Expand All @@ -154,18 +125,23 @@ def fun_sleep(dummy):

shutdown_file = tmp_path / "shutdown"

# NOTE: the executor.map call below is blocking. For this reason, we call
# the scancel function from a different thread, so that we can make it
# happen during the workflow execution The following block is based on
# https://stackoverflow.com/a/59645689/19085332

# NOTE: the executor.map call below is blocking. For this reason, we write
# the shutdown file from a subprocess.Popen, so that we can make it happen
# during the execution.
shutdown_sleep_time = 2
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(shutdown_file, shutdown_sleep_time)
tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {shutdown_sleep_time}\n")
f.write(f"cat NOTHING > {shutdown_file.as_posix()}\n")

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

with FractalSlurmExecutor(
slurm_user=monkey_slurm_user,
Expand All @@ -186,3 +162,6 @@ def fun_sleep(dummy):
list(res)
debug(e.value)
assert "shutdown" in str(e.value)

tmp_stdout.close()
tmp_stderr.close()
73 changes: 28 additions & 45 deletions tests/test_full_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
Institute for Biomedical Research and Pelkmans Lab from the University of
Zurich.
"""
import asyncio
import glob
import logging
import os
import threading
import time
import shlex
import subprocess
from pathlib import Path

import pytest
from devtools import debug

from .fixtures_slurm import scancel_all_jobs_of_a_slurm_user
from fractal_server.app.runner import _backends

PREFIX = "/api/v1"
Expand All @@ -33,7 +30,6 @@
backends_available = list(_backends.keys())


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
async def test_full_workflow(
db,
Expand All @@ -48,7 +44,6 @@ async def test_full_workflow(
request,
override_settings_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / f"artifacts-{backend}",
Expand Down Expand Up @@ -231,7 +226,6 @@ async def test_full_workflow(
assert "index" in list(output_dataset_json["meta"].keys())


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
async def test_failing_workflow_UnknownError(
client,
Expand Down Expand Up @@ -332,7 +326,6 @@ async def test_failing_workflow_UnknownError(
print(job_status_data["log"])


@pytest.mark.slow
@pytest.mark.parametrize("backend", backends_available)
@pytest.mark.parametrize("failing_task", ["parallel", "non_parallel"])
async def test_failing_workflow_TaskExecutionError(
Expand All @@ -350,7 +343,6 @@ async def test_failing_workflow_TaskExecutionError(
override_settings_factory,
resource_factory,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -500,30 +492,7 @@ async def test_failing_workflow_TaskExecutionError(
assert "index" in list(output_dataset_json["meta"].keys())


async def _auxiliary_scancel(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
logging.warning(f"[scancel_thread] run START {time.perf_counter()=}")
# Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING
# to RUNNING
time.sleep(sleep_time)
# Scancel all jobs of the current SLURM user
logging.warning(f"[scancel_thread] run SCANCEL {time.perf_counter()=}")
scancel_all_jobs_of_a_slurm_user(slurm_user=slurm_user, show_squeue=True)
logging.warning(f"[scancel_thread] run END {time.perf_counter()=}")


def _auxiliary_run(slurm_user, sleep_time):
# The _auxiliary_scancel and _auxiliary_run functions are used as in
# https://stackoverflow.com/a/59645689/19085332
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_auxiliary_scancel(slurm_user, sleep_time))
loop.close()


@pytest.mark.parametrize("backend", ["slurm"])
@pytest.mark.slow
async def test_failing_workflow_JobExecutionError(
backend,
client,
Expand All @@ -541,8 +510,8 @@ async def test_failing_workflow_JobExecutionError(
relink_python_interpreter,
cfut_jobs_finished,
resource_factory,
tmp_path,
):

override_settings_factory(
FRACTAL_RUNNER_BACKEND=backend,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path
Expand Down Expand Up @@ -603,19 +572,31 @@ async def test_failing_workflow_JobExecutionError(
debug(wftask1_id)

# NOTE: the client.post call below is blocking, due to the way we are
# running tests. For this reason, we call the scancel functionfrom a
# different thread, so that we can make it happen during the workflow
# execution
# The following block is based on
# https://stackoverflow.com/a/59645689/19085332
# running tests. For this reason, we call the scancel function from a
# from a subprocess.Popen, so that we can make it happen during the
# execution.
scancel_sleep_time = 10
slurm_user = monkey_slurm_user
logging.warning(f"PRE THREAD START {time.perf_counter()=}")
_thread = threading.Thread(
target=_auxiliary_run, args=(slurm_user, scancel_sleep_time)

tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {scancel_sleep_time}\n")
f.write(
(
f"sudo --non-interactive -u {slurm_user} "
f"scancel -u {slurm_user} -v"
"\n"
)
)

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_thread.start()
logging.warning(f"POST THREAD START {time.perf_counter()=}")

# Re-submit the modified workflow
res_second_apply = await client.post(
Expand Down Expand Up @@ -656,6 +637,9 @@ async def test_failing_workflow_JobExecutionError(
str(wftask1_id): "failed",
}

tmp_stdout.close()
tmp_stderr.close()


async def test_non_python_task(
client,
Expand Down Expand Up @@ -925,7 +909,6 @@ async def test_non_executable_task_command(
user_kwargs = {}

async with MockCurrentUser(persist=True, user_kwargs=user_kwargs) as user:

# Create task
task = await task_factory(
name="invalid-task-command",
Expand Down

0 comments on commit 020516e

Please sign in to comment.