From fddffca67b4f7e3b76b19df7db8e1511c449d2d9 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Fri, 25 Oct 2024 23:21:53 +0200 Subject: [PATCH] `DirectScheduler`: Ensure killing child processes (#6572) The current implementation only issues a kill command for the parent process, but this can leave child processes orphaned. The child processes are now retrieved and added explicitly to the kill command. --- src/aiida/schedulers/plugins/direct.py | 25 ++++++++++++++--- tests/schedulers/test_direct.py | 38 ++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/aiida/schedulers/plugins/direct.py b/src/aiida/schedulers/plugins/direct.py index 78421acb73..694ff93863 100644 --- a/src/aiida/schedulers/plugins/direct.py +++ b/src/aiida/schedulers/plugins/direct.py @@ -8,6 +8,8 @@ ########################################################################### """Plugin for direct execution.""" +from typing import Union + import aiida.schedulers from aiida.common.escaping import escape_for_bash from aiida.schedulers import SchedulerError @@ -354,13 +356,28 @@ def _parse_submit_output(self, retval, stdout, stderr): return stdout.strip() - def _get_kill_command(self, jobid): - """Return the command to kill the job with specified jobid.""" - submit_command = f'kill {jobid}' + def _get_kill_command(self, jobid: Union[int, str]) -> str: + """Return the command to kill the process with specified id and all its descendants. + + :param jobid: The job id is in the case of the + :py:class:`~aiida.schedulers.plugins.direct.DirectScheduler` the process id. + + :return: A string containing the kill command. + """ + from psutil import Process + + # get a list of the process id of all descendants + process = Process(int(jobid)) + children = process.children(recursive=True) + jobids = [str(jobid)] + jobids.extend([str(child.pid) for child in children]) + jobids_str = ' '.join(jobids) + + kill_command = f'kill {jobids_str}' self.logger.info(f'killing job {jobid}') - return submit_command + return kill_command def _parse_kill_output(self, retval, stdout, stderr): """Parse the output of the kill command. diff --git a/tests/schedulers/test_direct.py b/tests/schedulers/test_direct.py index 8879bdc88e..175f5ede48 100644 --- a/tests/schedulers/test_direct.py +++ b/tests/schedulers/test_direct.py @@ -70,3 +70,41 @@ def test_submit_script_with_num_cores_per_mpiproc(scheduler, template): ) result = scheduler.get_submit_script(template) assert f'export OMP_NUM_THREADS={num_cores_per_mpiproc}' in result + + +@pytest.mark.timeout(timeout=10) +def test_kill_job(scheduler, tmpdir): + """Test if kill_job kill all descendant children from the process. + For that we spawn a new process that runs a sleep command, then we + kill it and check if the sleep process is still alive. + + current process forked process run script.sh + python─────────────python───────────────────bash──────sleep + we kill this process we check if still running + """ + import multiprocessing + import time + + from aiida.transports.plugins.local import LocalTransport + from psutil import Process + + def run_sleep_100(): + import subprocess + + script = tmpdir / 'sleep.sh' + script.write('sleep 100') + # this is blocking for the process entering + subprocess.run(['bash', script.strpath], check=False) + + forked_process = multiprocessing.Process(target=run_sleep_100) + forked_process.start() + while len(forked_process_children := Process(forked_process.pid).children(recursive=True)) != 2: + time.sleep(0.1) + bash_process = forked_process_children[0] + sleep_process = forked_process_children[1] + with LocalTransport() as transport: + scheduler.set_transport(transport) + scheduler.kill_job(forked_process.pid) + while bash_process.is_running() or sleep_process.is_running(): + time.sleep(0.1) + forked_process.join()