Skip to content

Commit

Permalink
Merge branch 'master' into release/2.2.10
Browse files Browse the repository at this point in the history
  • Loading branch information
calchoo authored Aug 19, 2021
2 parents 6b8b3b2 + ad01c94 commit 0f2b8b9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
22 changes: 13 additions & 9 deletions mavis/schedule/local.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import atexit
from concurrent import futures
from datetime import datetime
import logging
import multiprocessing
import os
from concurrent import futures
from datetime import datetime

import shortuuid

from ..util import LOG
from ..annotate.file_io import REFERENCE_DEFAULTS, ReferenceFile

from ..util import LOG
from .constants import JOB_STATUS, SCHEDULER
from .job import Job
from .scheduler import Scheduler
from .constants import JOB_STATUS, SCHEDULER


class LocalJob(Job):
Expand Down Expand Up @@ -53,12 +52,16 @@ def flatten(self):
return {k: v for k, v in result.items() if k not in omit}


def write_stamp_callback(response):
def write_stamp_callback(job, response):
if response.exception() or response.cancelled() or response.running():
LOG('error:' + str(response.exception()), level='ERROR')
if not os.path.exists(job.logfile()):
with open(job.logfile(), 'w') as fh:
fh.write('error:' + str(response.exception()))
return
try:
LOG('writing:', response.complete_stamp, time_stamp=True, indent_level=1)
with open(response.complete_stamp, 'w') as fh:
LOG('writing:', job.complete_stamp(), time_stamp=True, indent_level=1)
with open(job.complete_stamp(), 'w') as fh:
fh.write('end: {}\n'.format(int(datetime.timestamp(datetime.utcnow()))))
except Exception as err:
LOG('error writing the complete stamp', level=logging.CRITICAL, indent_level=1)
Expand Down Expand Up @@ -111,7 +114,7 @@ def submit(self, job):
job.func, args
) # no arguments, defined all in the job object
setattr(job.response, 'complete_stamp', job.complete_stamp())
job.response.add_done_callback(write_stamp_callback)
job.response.add_done_callback(lambda resp: write_stamp_callback(job, resp))
self.submitted[job.job_ident] = job
job.rank = len(self.submitted)
LOG('submitted', job.name, indent_level=1)
Expand All @@ -125,6 +128,7 @@ def wait(self):
return
self.pool.shutdown(True)
self.pool = None

for job in self.submitted.values():
self.update_info(job)

Expand Down
25 changes: 16 additions & 9 deletions mavis/schedule/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
from configparser import ConfigParser, ExtendedInterpolation
import os
import re
import shutil
import subprocess
from configparser import ConfigParser, ExtendedInterpolation

from shortuuid import uuid

from ..cluster import constants as _CLUSTER
from ..constants import SUBCOMMAND, PROTOCOL, EXIT_ERROR, EXIT_OK, EXIT_INCOMPLETE
from ..tools import convert_tool_output
from ..util import mkdirp, output_tabbed_file, LOG, DEVNULL
from ..validate import constants as _VALIDATE
from ..annotate import constants as _ANNOTATE
from ..annotate import file_io as _file_io
from ..cluster import constants as _CLUSTER
from ..constants import EXIT_ERROR, EXIT_INCOMPLETE, EXIT_OK, PROTOCOL, SUBCOMMAND
from ..summary import constants as _SUMMARY
from .job import Job, ArrayJob, LogFile, TorqueArrayJob
from .scheduler import SlurmScheduler, TorqueScheduler, SgeScheduler, consecutive_ranges
from ..tools import convert_tool_output
from ..util import DEVNULL, LOG, mkdirp, output_tabbed_file
from ..validate import constants as _VALIDATE
from .constants import JOB_STATUS, OPTIONS, SCHEDULER, STD_OPTIONS
from .job import ArrayJob, Job, LogFile, TorqueArrayJob
from .local import LocalJob, LocalScheduler
from .constants import JOB_STATUS, STD_OPTIONS, OPTIONS, SCHEDULER
from .scheduler import SgeScheduler, SlurmScheduler, TorqueScheduler, consecutive_ranges

PROGNAME = shutil.which('mavis')

SHEBANG = '#!/bin/bash'
SCHEDULERS_BY_NAME = {
sched.NAME: sched for sched in [SlurmScheduler, TorqueScheduler, LocalScheduler, SgeScheduler]
Expand Down Expand Up @@ -260,6 +261,12 @@ def write_submission_script(self, subcommand, job, args, aligner_path=None):
args (dict): arguments for the subcommand
"""
LOG('writing:', job.script, time_stamp=True)
if PROGNAME is None:
raise FileNotFoundError(
'The mavis executable was not found on the current PATH. '
'This is required in order to create submission scripts. '
'Please add mavis to the PATH first with export PATH=$PATH:/path/to/mavis/bin'
)
with open(job.script, 'w') as fh:
fh.write(
"""{shebang}
Expand Down

0 comments on commit 0f2b8b9

Please sign in to comment.