Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: abstract generation of submit script env variables #5283

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,32 +150,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

env_lines = []

if job_tmpl.job_resource and job_tmpl.job_resource.num_cores_per_mpiproc:
# since this was introduced after the environment injection below,
# it is intentionally put before it to avoid breaking current users script by overruling
# any explicit OMP_NUM_THREADS they may have set in their job_environment
env_lines.append(f'export OMP_NUM_THREADS={job_tmpl.job_resource.num_cores_per_mpiproc}')

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
if job_tmpl.job_environment:
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
env_lines.append(f'export {key.strip()}={escape_for_bash(value)}')

if env_lines:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
lines += env_lines
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

if job_tmpl.rerunnable:
self.logger.warning(
"The 'rerunnable' option is set to 'True', but has no effect when using the direct scheduler."
Expand All @@ -199,6 +173,21 @@ def _get_submit_script_header(self, job_tmpl):

return '\n'.join(lines)

def _get_submit_script_environment_variables(self, template):
"""Return the part of the submit script header that defines environment variables.

:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
:return: string containing environment variable declarations.
"""
result = super()._get_submit_script_environment_variables(template)

if template.job_resource and template.job_resource.num_cores_per_mpiproc:
# This should be prepended to the environment variables from the template, such that it does not overrule
# any explicit OMP_NUM_THREADS that may have been defined in the ``template.job_environment``.
result = f'export OMP_NUM_THREADS={template.job_resource.num_cores_per_mpiproc}\n{result}'

return result

def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
Expand Down
19 changes: 0 additions & 19 deletions aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#BSUB -H')
Expand Down Expand Up @@ -434,23 +432,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# hand.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)

# The following seems to be the only way to copy the input files
# to the node where the computation are actually launched (the
# -f option of bsub that does not always work...)
Expand Down
16 changes: 0 additions & 16 deletions aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

# Required to change directory to the working directory, that is
# the one from which the job was submitted
lines.append('cd "$PBS_O_WORKDIR"')
Expand Down
18 changes: 0 additions & 18 deletions aiida/schedulers/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []

# SGE provides flags for wd and cwd
Expand Down Expand Up @@ -267,22 +265,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# TAKEN FROM PBSPRO:
# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

return '\n'.join(lines)

def _get_submit_command(self, submit_script):
Expand Down
21 changes: 0 additions & 21 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""
import re

from aiida.common.escaping import escape_for_bash
from aiida.common.lang import type_check
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource
Expand Down Expand Up @@ -263,8 +262,6 @@ def _get_submit_script_header(self, job_tmpl):
# pylint: disable=too-many-statements,too-many-branches
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#SBATCH -H')
Expand Down Expand Up @@ -398,24 +395,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)

return '\n'.join(lines)

def _get_submit_command(self, submit_script):
Expand Down
22 changes: 22 additions & 0 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ def get_submit_script(self, job_tmpl):
script_lines.append(self._get_submit_script_header(job_tmpl))
script_lines.append(empty_line)

environment_variables = self._get_submit_script_environment_variables(job_tmpl)
if environment_variables:
script_lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
script_lines.append(environment_variables)
script_lines.append('# ENVIRONMENT VARIABLES END ###')
script_lines.append(empty_line)

if job_tmpl.prepend_text:
script_lines.append(job_tmpl.prepend_text)
script_lines.append(empty_line)
Expand All @@ -170,6 +177,21 @@ def get_submit_script(self, job_tmpl):

return '\n'.join(script_lines)

def _get_submit_script_environment_variables(self, template): # pylint: disable=no-self-use
"""Return the part of the submit script header that defines environment variables.

:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
:return: string containing environment variable declarations.
"""
if template.job_environment is None:
return ''

if not isinstance(template.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')

lines = [f'export {key.strip()}={escape_for_bash(value)}' for key, value in template.job_environment.items()]
return '\n'.join(lines)

@abc.abstractmethod
def _get_submit_script_header(self, job_tmpl):
"""Return the submit script header, using the parameters from the job template.
Expand Down
13 changes: 8 additions & 5 deletions tests/schedulers/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import unittest

from aiida.common.datastructures import CodeRunMode
from aiida.schedulers.datastructures import JobState
from aiida.schedulers.plugins.sge import SgeScheduler
from aiida.schedulers.scheduler import SchedulerError, SchedulerParsingError
Expand Down Expand Up @@ -310,6 +311,8 @@ def test_submit_script(self):
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)
job_tmpl.working_directory = '/home/users/dorigm7s/test'
job_tmpl.submit_as_hold = None
Expand All @@ -325,14 +328,12 @@ def test_submit_script(self):
job_tmpl.max_wallclock_seconds = '3600' # "23:59:59"
job_tmpl.job_environment = {'HOME': '/home/users/dorigm7s/', 'WIENROOT': '$HOME:/WIEN2k'}

submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)

self.assertTrue('#$ -wd /home/users/dorigm7s/test' in submit_script_text)
self.assertTrue('#$ -N BestJobEver' in submit_script_text)
self.assertTrue('#$ -q FavQ.q' in submit_script_text)
self.assertTrue('#$ -l h_rt=01:00:00' in submit_script_text)
# self.assertTrue( 'export HOME=/home/users/dorigm7s/'
# in submit_script_text )
self.assertTrue('# ENVIRONMENT VARIABLES BEGIN ###' in submit_script_text)
self.assertTrue("export HOME='/home/users/dorigm7s/'" in submit_script_text)
self.assertTrue("export WIENROOT='$HOME:/WIEN2k'" in submit_script_text)
Expand All @@ -345,15 +346,17 @@ def test_submit_script_rerunnable(self): # pylint: disable=no-self-use
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)

job_tmpl.rerunnable = True
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' in submit_script_text
assert '#$ -r no' not in submit_script_text

job_tmpl.rerunnable = False
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' not in submit_script_text
assert '#$ -r no' in submit_script_text

Expand Down