Skip to content

Commit

Permalink
Separate JobProcess submit task in folder upload and scheduler submit (
Browse files Browse the repository at this point in the history
…#1946)

Originally, the submit TransportTask of a JobProcess included both the
uploading of the work directory on the remote machine as well as the
call to the scheduler to submit the job. These have now been separated
into two separate TransportTasks, `upload` and `submit`, in order to
provide finer grained control over the job process control flow.
  • Loading branch information
sphuber authored Oct 2, 2018
1 parent 1f73e50 commit b809472
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 19 deletions.
9 changes: 9 additions & 0 deletions .ci/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ def main():
print("#" * 78)
print("####### TIME ELAPSED: {} s".format(time.time() - start_time))
print("#" * 78)
print("Output of 'verdi process list -a':")
try:
print(subprocess.check_output(
["verdi", "process", "list", "-a"],
stderr=subprocess.STDOUT,
))
except subprocess.CalledProcessError as e:
print("Note: the command failed, message: {}".format(e))

print("Output of 'verdi calculation list -a':")
try:
print(subprocess.check_output(
Expand Down
6 changes: 4 additions & 2 deletions aiida/common/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from __future__ import absolute_import
import logging
from copy import deepcopy
from logging import config
from aiida.common import setup

# Custom logging level, intended specifically for informative log messages
Expand Down Expand Up @@ -151,6 +150,7 @@ def emit(self, record):
},
}


def configure_logging(daemon=False, daemon_log_file=None):
"""
Setup the logging by retrieving the LOGGING dictionary from aiida and passing it to
Expand All @@ -163,6 +163,8 @@ def configure_logging(daemon=False, daemon_log_file=None):
of the default 'console' StreamHandler
:param daemon_log_file: absolute filepath of the log file for the RotatingFileHandler
"""
from logging.config import dictConfig

config = deepcopy(LOGGING)
daemon_handler_name = 'daemon_log_file'

Expand All @@ -185,7 +187,7 @@ def configure_logging(daemon=False, daemon_log_file=None):
for name, logger in config.get('loggers', {}).items():
logger.setdefault('handlers', []).append(daemon_handler_name)

logging.config.dictConfig(config)
dictConfig(config)


def get_dblogger_extra(obj):
Expand Down
52 changes: 44 additions & 8 deletions aiida/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
from aiida.orm.data.folder import FolderData
from aiida.scheduler.datastructures import JOB_STATES

REMOTE_WORK_DIRECTORY_LOST_FOUND = 'lost+found'

execlogger = aiidalogger.getChild('execmanager')


def submit_calculation(calculation, transport, calc_info, script_filename):
def upload_calculation(calculation, transport, calc_info, script_filename):
"""
Submit a calculation
Upload a calculation
:param calculation: the instance of JobCalculation to submit.
:param transport: an already opened transport to use to submit the calculation.
Expand Down Expand Up @@ -103,11 +104,33 @@ def submit_calculation(calculation, transport, calc_info, script_filename):
transport.chdir(calc_info.uuid[:2])
transport.mkdir(calc_info.uuid[2:4], ignore_existing=True)
transport.chdir(calc_info.uuid[2:4])
transport.mkdir(calc_info.uuid[4:])
transport.chdir(calc_info.uuid[4:])

try:
# The final directory may already exist, most likely because this function was already executed once, but
# failed and as a result was rescheduled by the eninge. In this case it would be fine to delete the folder
# and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on
# the safe side, we move the folder to the lost+found directory before recreating the folder from scratch
transport.mkdir(calc_info.uuid[4:])
except OSError:
# Move the existing directory to lost+found, log a warning and create a clean directory anyway
path_existing = os.path.join(transport.getcwd(), calc_info.uuid[4:])
path_lost_found = os.path.join(remote_working_directory, REMOTE_WORK_DIRECTORY_LOST_FOUND)
path_target = os.path.join(path_lost_found, calc_info.uuid)
execlogger.warning('tried to create path {} but it already exists, moving the entire folder to {}'.format(
path_existing, path_target))

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
transport.mkdir(path_lost_found, ignore_existing=True)
transport.copytree(path_existing, path_target)
transport.rmtree(path_existing)

# Now we can create a clean folder for this calculation
transport.mkdir(calc_info.uuid[4:])
finally:
transport.chdir(calc_info.uuid[4:])

# I store the workdir of the calculation for later file retrieval
workdir = transport.getcwd()
# I store the workdir of the calculation for later file
# retrieval
calculation._set_remote_workdir(workdir)

# I first create the code files, so that the code can put
Expand Down Expand Up @@ -193,10 +216,23 @@ def submit_calculation(calculation, transport, calc_info, script_filename):
remotedata.add_link_from(calculation, label='remote_folder', link_type=LinkType.CREATE)
remotedata.store()

scheduler = computer.get_scheduler()
return calc_info, script_filename


def submit_calculation(calculation, transport, calc_info, script_filename):
"""
Submit a calculation
:param calculation: the instance of JobCalculation to submit.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `JobCalculation._presubmit`
:param script_filename: the job launch script returned by `JobCalculation._presubmit`
"""
scheduler = calculation.get_computer().get_scheduler()
scheduler.set_transport(transport)

job_id = scheduler.submit_from_script(transport.getcwd(), script_filename)
workdir = calculation._get_remote_workdir()
job_id = scheduler.submit_from_script(workdir, script_filename)
calculation._set_job_id(job_id)


Expand Down
96 changes: 87 additions & 9 deletions aiida/work/job_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

__all__ = ['JobProcess']

UPLOAD_COMMAND = 'upload'
SUBMIT_COMMAND = 'submit'
UPDATE_COMMAND = 'update'
RETRIEVE_COMMAND = 'retrieve'
Expand All @@ -47,6 +48,61 @@
logger = logging.getLogger(__name__)


@coroutine
def task_upload_job(node, transport_queue, calc_info, script_filename, cancel_flag):
"""
Transport task that will attempt to upload the files of a job calculation to the remote
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param calc_info: the calculation info datastructure returned by `JobCalculation._presubmit`
:param script_filename: the job launch script returned by `JobCalculation._presubmit`
:param cancel_flag: the cancelled flag that will be queried to determine whether the task was cancelled
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS

authinfo = node.get_computer().get_authinfo(node.get_user())

state_pending = calc_states.SUBMITTING

if is_progressive_state_change(node.get_state(), state_pending):
node._set_state(state_pending)
else:
logger.warning('ignored invalid proposed state change: {} to {}'.format(node.get_state(), state_pending))

@coroutine
def do_upload():
with transport_queue.request_transport(authinfo) as request:
transport = yield request

# It may have taken time to get the transport, check if we've been cancelled
if cancel_flag.is_cancelled:
raise plumpy.CancelledError('task_upload_job for calculation<{}> cancelled'.format(node.pk))

logger.info('uploading calculation<{}>'.format(node.pk))
raise Return(execmanager.upload_calculation(node, transport, calc_info, script_filename))

try:
result = yield exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.CancelledError)
except plumpy.CancelledError:
pass
except Exception:
logger.warning('uploading calculation<{}> failed'.format(node.pk))
raise TransportTaskException('upload_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('uploading calculation<{}> successful'.format(node.pk))
raise Return(result)


@coroutine
def task_submit_job(node, transport_queue, calc_info, script_filename, cancel_flag):
"""
Expand Down Expand Up @@ -82,14 +138,8 @@ def do_submit():
logger.info('submitting calculation<{}>'.format(node.pk))
raise Return(execmanager.submit_calculation(node, transport, calc_info, script_filename))

state_pending = calc_states.SUBMITTING
state_success = calc_states.WITHSCHEDULER

if is_progressive_state_change(node.get_state(), state_pending):
node._set_state(state_pending)
else:
logger.warning('ignored invalid proposed state change: {} to {}'.format(node.get_state(), state_pending))

try:
result = yield exponential_backoff_retry(
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.CancelledError)
Expand Down Expand Up @@ -288,7 +338,11 @@ def execute(self):

try:

if command == SUBMIT_COMMAND:
if command == UPLOAD_COMMAND:
calc_info, script_filename = yield self._launch_task(task_upload_job, calculation, transport_queue, *args)
raise Return(self.submit(calc_info, script_filename))

elif command == SUBMIT_COMMAND:
yield self._launch_task(task_submit_job, calculation, transport_queue, *args)
raise Return(self.scheduler_update())

Expand Down Expand Up @@ -337,6 +391,30 @@ def _launch_task(self, coro, *args, **kwargs):
finally:
self._task = None

def upload(self, calc_info, script_filename):
"""
Create the next state to go to
:return: The appropriate WAITING state
"""
return self.create_state(
processes.ProcessState.WAITING,
None,
msg='Waiting for calculation folder upload',
data=(UPLOAD_COMMAND, calc_info, script_filename))

def submit(self, calc_info, script_filename):
"""
Create the next state to go to
:return: The appropriate WAITING state
"""
return self.create_state(
processes.ProcessState.WAITING,
None,
msg='Waiting for scheduler submission',
data=(SUBMIT_COMMAND, calc_info, script_filename))

def scheduler_update(self):
"""
Create the next state to go to
Expand Down Expand Up @@ -555,8 +633,8 @@ def run(self):
# After this call, no modifications to the folder should be done
self.calc._store_raw_input_folder(folder.abspath)

# Launch the submit operation
return plumpy.Wait(msg='Waiting to submit', data=(SUBMIT_COMMAND, calc_info, script_filename))
# Launch the upload operation
return plumpy.Wait(msg='Waiting to upload', data=(UPLOAD_COMMAND, calc_info, script_filename))

def retrieved(self, retrieved_temporary_folder=None):
"""
Expand Down

0 comments on commit b809472

Please sign in to comment.