Skip to content

Commit

Permalink
Use asyncio.Lock for forward_model_ok
Browse files Browse the repository at this point in the history
This makes sure that we will not run more than 1 internalization job at a time.
  • Loading branch information
xjules committed Aug 26, 2024
1 parent 31d3aa2 commit 6437bb8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,20 @@ async def _verify_checksum(self, timeout: int = 120) -> None: # noqa: ASYNC109
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
callback_status, status_msg = await forward_model_ok(self.real.run_arg)
if self._callback_status_msg:
self._callback_status_msg = status_msg
else:
self._callback_status_msg += f"\nstatus from done callback: {status_msg}"

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
await self._send(JobState.COMPLETED)
else:
assert callback_status == LoadStatus.LOAD_FAILURE
await self._send(JobState.FAILED)
async with self._scheduler._fmok_lock:
callback_status, status_msg = await forward_model_ok(self.real.run_arg)
if self._callback_status_msg:
self._callback_status_msg = status_msg
else:
self._callback_status_msg += (
f"\nstatus from done callback: {status_msg}"
)

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
await self._send(JobState.COMPLETED)
else:
assert callback_status == LoadStatus.LOAD_FAILURE
await self._send(JobState.FAILED)

async def _handle_failure(self) -> None:
assert self._requested_max_submit is not None
Expand Down
4 changes: 4 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ def __init__(
self._completed_jobs_num: int = 0
self.completed_jobs: asyncio.Queue[int] = asyncio.Queue()

# this lock is to assure that no more than 1 task
# does internalization at a time
self._fmok_lock: asyncio.Lock = asyncio.Lock()

self._cancelled = False
if max_submit < 0:
raise ValueError(
Expand Down

0 comments on commit 6437bb8

Please sign in to comment.