Skip to content

Commit

Permalink
Catch FileNotFoundError in driver._execute_with_retry
Browse files Browse the repository at this point in the history
This commit makes the drivers handle FileNotFound exceptions in `driver._execute_with_retry(...)` and `driver.poll()` for the various drivers. The driver will now return early and avoid retrying invalid shell commands.
  • Loading branch information
jonathan-eq committed Sep 3, 2024
1 parent d71be2a commit f2c5ca0
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 31 deletions.
16 changes: 10 additions & 6 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ async def _execute_with_retry(
error_message: Optional[str] = None

for _ in range(total_attempts):
process = await asyncio.create_subprocess_exec(
*cmd_with_args,
stdin=asyncio.subprocess.PIPE if stdin else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
process = await asyncio.create_subprocess_exec(
*cmd_with_args,
stdin=asyncio.subprocess.PIPE if stdin else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
return (False, str(e))

stdout, stderr = await process.communicate(stdin)

assert process.returncode is not None
Expand Down
23 changes: 14 additions & 9 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,15 +409,20 @@ async def poll(self) -> None:
await asyncio.sleep(self._poll_period)
continue
current_jobids = list(self._jobs.keys())
process = await asyncio.create_subprocess_exec(
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

try:
process = await asyncio.create_subprocess_exec(
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
logger.error(str(e))
return

stdout, stderr = await process.communicate()
if process.returncode:
Expand Down
20 changes: 12 additions & 8 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,18 @@ async def poll(self) -> None:
continue

if self._non_finished_job_ids:
process = await asyncio.create_subprocess_exec(
str(self._qstat_cmd),
"-Ex",
"-w", # wide format
*self._non_finished_job_ids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
process = await asyncio.create_subprocess_exec(
str(self._qstat_cmd),
"-Ex",
"-w", # wide format
*self._non_finished_job_ids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
logger.error(str(e))
return
stdout, stderr = await process.communicate()
if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}:
# Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but
Expand Down
18 changes: 10 additions & 8 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,16 @@ async def poll(self) -> None:
arguments = ["-h", "--format=%i %T"]
if self._user:
arguments.append(f"--user={self._user}")

process = await asyncio.create_subprocess_exec(
str(self._squeue),
*arguments,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

try:
process = await asyncio.create_subprocess_exec(
str(self._squeue),
*arguments,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
logger.error(str(e))
return
stdout, stderr = await process.communicate()
if process.returncode:
logger.warning(
Expand Down
35 changes: 35 additions & 0 deletions tests/integration_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import os
import signal
import sys
Expand Down Expand Up @@ -212,3 +213,37 @@ async def test_num_cpu_sets_env_variables(driver: Driver, tmp_path, job_name):
elif isinstance(driver, OpenPBSDriver):
assert "OMP_NUM_THREADS=2" in env_lines
assert "NCPUS=2" in env_lines


@pytest.mark.integration_test
async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, caplog):
caplog.set_level(logging.DEBUG)
invalid_cmd = ["/usr/bin/foo", "bar"]
(succeeded, message) = await driver._execute_with_retry(
invalid_cmd, total_attempts=3
)

# We log a retry message every time we retry
assert "retry" not in str(caplog.text)
assert not succeeded
assert "No such file or directory" in message
assert "/usr/bin/foo" in message


@pytest.mark.integration_test
async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog):
if isinstance(driver, LocalDriver):
pytest.skip("LocalDriver does not poll")
caplog.set_level(logging.DEBUG)
invalid_cmd = ["/usr/bin/foo", "bar"]
driver._bjobs_cmd = invalid_cmd
driver._qstat_cmd = invalid_cmd
driver._squeue = invalid_cmd
driver._jobs = {"foo": "bar"}
driver._non_finished_job_ids = ["foo"]
await driver.poll()

# We log a retry message every time we retry
assert "retry" not in str(caplog.text)
assert "No such file or directory" in str(caplog.text)
assert "/usr/bin/foo" in str(caplog.text)

0 comments on commit f2c5ca0

Please sign in to comment.