Skip to content

Commit

Permalink
Scheduler: abstract generation of submit script env variables (#5283)
Browse files Browse the repository at this point in the history
The environment variables defined in the `JobTemplate` need to be
written to the submit script header. The `Scheduler.get_submit_script`
relied on the `__get_submit_script_header` abstract method to do this.
This forces each plugin to write this code, even though this is very
likely to be scheduler independent.

Therefore it is best to abstract this functionality to the base class
such that each scheduler plugin automatically has this implemented. If
really needed, the plugin can still override the behavior by explicitly
reimplementing the `_get_submit_script_environment_variables` method.

The new `_get_submit_script_environment_variables` method could be called
in the `get_submit_script` method of the base class, such that it is
automatically included for all subclasses, but this is currently not
possible for backwards compatibility reasons. Since currently it is up
to plugins to format the variables themselves in the method
`_get_submit_script_header`, if we were to now also add it in the base
method, the environment variables would be included twice. Therefore, we
opt that plugins just use the new method if they want to. This still
requires external plugins to update to use the new method to print the
environment variables for it to pick up the changes if `aiida-core`
decides to improve the implementation.

Note that it looks that `_get_submit_script_environment_variables` could
be a `staticmethod`, but unfortunately it is not possible to call the
super when overriding the staticmethod in a subclass.
  • Loading branch information
sphuber authored Dec 19, 2021
1 parent 7fad822 commit c743a33
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 94 deletions.
26 changes: 3 additions & 23 deletions aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,11 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

env_lines = []

if job_tmpl.job_resource and job_tmpl.job_resource.num_cores_per_mpiproc:
# since this was introduced after the environment injection below,
# it is intentionally put before it to avoid breaking current users script by overruling
# any explicit OMP_NUM_THREADS they may have set in their job_environment
env_lines.append(f'export OMP_NUM_THREADS={job_tmpl.job_resource.num_cores_per_mpiproc}')

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
lines.append(f'export OMP_NUM_THREADS={job_tmpl.job_resource.num_cores_per_mpiproc}')

if job_tmpl.job_environment:
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
env_lines.append(f'export {key.strip()}={escape_for_bash(value)}')

if env_lines:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
lines += env_lines
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)
lines.append(self._get_submit_script_environment_variables(job_tmpl))

if job_tmpl.rerunnable:
self.logger.warning(
Expand Down
18 changes: 1 addition & 17 deletions aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#BSUB -H')
Expand Down Expand Up @@ -434,22 +432,8 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# hand.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)
lines.append(self._get_submit_script_environment_variables(job_tmpl))

# The following seems to be the only way to copy the input files
# to the node where the computation are actually launched (the
Expand Down
15 changes: 1 addition & 14 deletions aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,8 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)
lines.append(self._get_submit_script_environment_variables(job_tmpl))

# Required to change directory to the working directory, that is
# the one from which the job was submitted
Expand Down
17 changes: 1 addition & 16 deletions aiida/schedulers/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []

# SGE provides flags for wd and cwd
Expand Down Expand Up @@ -267,21 +265,8 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# TAKEN FROM PBSPRO:
# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)
lines.append(self._get_submit_script_environment_variables(job_tmpl))

return '\n'.join(lines)

Expand Down
20 changes: 1 addition & 19 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""
import re

from aiida.common.escaping import escape_for_bash
from aiida.common.lang import type_check
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource
Expand Down Expand Up @@ -263,8 +262,6 @@ def _get_submit_script_header(self, job_tmpl):
# pylint: disable=too-many-statements,too-many-branches
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#SBATCH -H')
Expand Down Expand Up @@ -398,23 +395,8 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)
lines.append(self._get_submit_script_environment_variables(job_tmpl))

return '\n'.join(lines)

Expand Down
18 changes: 18 additions & 0 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,24 @@ def get_submit_script(self, job_tmpl):

return '\n'.join(script_lines)

def _get_submit_script_environment_variables(self, template): # pylint: disable=no-self-use
"""Return the part of the submit script header that defines environment variables.
:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
:return: string containing environment variable declarations.
"""
if not isinstance(template.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')

lines = ['# ENVIRONMENT VARIABLES BEGIN ###']

for key, value in template.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')

lines.append('# ENVIRONMENT VARIABLES END ###')

return '\n'.join(lines)

@abc.abstractmethod
def _get_submit_script_header(self, job_tmpl):
"""Return the submit script header, using the parameters from the job template.
Expand Down
13 changes: 8 additions & 5 deletions tests/schedulers/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import unittest

from aiida.common.datastructures import CodeRunMode
from aiida.schedulers.datastructures import JobState
from aiida.schedulers.plugins.sge import SgeScheduler
from aiida.schedulers.scheduler import SchedulerError, SchedulerParsingError
Expand Down Expand Up @@ -310,6 +311,8 @@ def test_submit_script(self):
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)
job_tmpl.working_directory = '/home/users/dorigm7s/test'
job_tmpl.submit_as_hold = None
Expand All @@ -325,14 +328,12 @@ def test_submit_script(self):
job_tmpl.max_wallclock_seconds = '3600' # "23:59:59"
job_tmpl.job_environment = {'HOME': '/home/users/dorigm7s/', 'WIENROOT': '$HOME:/WIEN2k'}

submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)

self.assertTrue('#$ -wd /home/users/dorigm7s/test' in submit_script_text)
self.assertTrue('#$ -N BestJobEver' in submit_script_text)
self.assertTrue('#$ -q FavQ.q' in submit_script_text)
self.assertTrue('#$ -l h_rt=01:00:00' in submit_script_text)
# self.assertTrue( 'export HOME=/home/users/dorigm7s/'
# in submit_script_text )
self.assertTrue('# ENVIRONMENT VARIABLES BEGIN ###' in submit_script_text)
self.assertTrue("export HOME='/home/users/dorigm7s/'" in submit_script_text)
self.assertTrue("export WIENROOT='$HOME:/WIEN2k'" in submit_script_text)
Expand All @@ -345,15 +346,17 @@ def test_submit_script_rerunnable(self): # pylint: disable=no-self-use
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)

job_tmpl.rerunnable = True
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' in submit_script_text
assert '#$ -r no' not in submit_script_text

job_tmpl.rerunnable = False
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' not in submit_script_text
assert '#$ -r no' in submit_script_text

Expand Down

0 comments on commit c743a33

Please sign in to comment.