diff --git a/README.md b/README.md index a8bb21e..9f9403c 100644 --- a/README.md +++ b/README.md @@ -33,8 +33,8 @@ Possible values for "taskstatus" are: Jobs are marked 'auto' either by submitting through the python class ``prisms_jobs.Job`` -with the attribute ``auto=True``, or by submitting a PBS script which contains -the line ``#auto=True`` using the included ``psub`` script. +with the attribute ``auto=True``, or by submitting a script which contains +the line ``#auto=True`` using the included ``psub`` command line program. Jobs can be monitored using the command line program ``pstat``. All 'auto' jobs which have stopped can be resubmitted using ``pstat --continue``. In this case, @@ -70,7 +70,7 @@ Jobs not marked 'auto' are shown with the status "Check" in ``pstat`` until the marks them as "Complete". -## Installation from PyPI (todo) +## Installation from PyPI Using ``pip``: @@ -85,7 +85,7 @@ If installing to a user directory, you may need to set your PATH to find the ins export PATH=$PATH:`python -m site --user-base`/bin -## Install using conda (todo) +## Install using conda conda config --add channels prisms-center conda install prisms-jobs @@ -99,9 +99,9 @@ If installing to a user directory, you may need to set your PATH to find the ins git clone https://github.com/prisms-center/prisms_jobs.git cd prisms_jobs -2. Checkout the branch/tag containing the version you wish to install. Latest is ``v3.0.1``: +2. Checkout the branch/tag containing the version you wish to install. Latest is ``v4.0.0``: - git checkout v3.0.1 + git checkout v4.0.0 2. From the root directory of the repository: diff --git a/build_conda.sh b/build_conda.sh index 7f969c3..5d4dfec 100644 --- a/build_conda.sh +++ b/build_conda.sh @@ -1,6 +1,5 @@ # begin anaconda login -conda config --set anaconda_upload yes # build, get location of result, upload conda build conda-recipes/prisms_jobs > conda-recipes/tmp.out @@ -19,5 +18,4 @@ conda convert --platform linux-64 $LOCATION -o conda-recipes anaconda upload --user prisms-center conda-recipes/linux-64/prisms-jobs* # finish -conda config --set anaconda_upload no anaconda logout diff --git a/conda-recipes/prisms_jobs/meta.yaml b/conda-recipes/prisms_jobs/meta.yaml index ec2da72..393350b 100644 --- a/conda-recipes/prisms_jobs/meta.yaml +++ b/conda-recipes/prisms_jobs/meta.yaml @@ -1,18 +1,22 @@ package: name: prisms-jobs - version: "3.0.1" + version: "4.0.0" source: - git_rev: v3.0.1 + git_rev: v4.0.0 git_url: https://github.com/prisms-center/prisms_jobs.git requirements: build: - python - setuptools + - future + - six - argparse # [py26] run: - python + - future + - six - argparse # [py26] about: diff --git a/doc/source/api/index.rst b/doc/source/api/index.rst index ab6dab9..7323b8a 100644 --- a/doc/source/api/index.rst +++ b/doc/source/api/index.rst @@ -16,7 +16,6 @@ prisms_jobs prisms_jobs.EligibilityError prisms_jobs.complete_job prisms_jobs.error_job - prisms_jobs.set_software prisms_jobs.interface --------------------- @@ -27,6 +26,26 @@ prisms_jobs.interface prisms_jobs.interface.torque prisms_jobs.interface.slurm prisms_jobs.interface.default + +prisms_jobs.config +--------------------- + +.. autosummary:: + :toctree: + + prisms_jobs.config.configure + prisms_jobs.config.dbpath + prisms_jobs.config.settings + prisms_jobs.config.read_config + prisms_jobs.config.write_config + prisms_jobs.config.default_settings + prisms_jobs.config.config_dir + prisms_jobs.config.config_path + prisms_jobs.config.update_selection_method + prisms_jobs.config.set_update_selection_method + prisms_jobs.config.software + prisms_jobs.config.set_software + prisms_jobs.config.detect_software prisms_jobs.misc ---------------- diff --git a/doc/source/conf.py b/doc/source/conf.py index a54293c..dbce178 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -84,9 +84,9 @@ # built documents. # # The short X.Y version. -version = u'3.0' +version = u'4.0' # The full version, including alpha/beta/rc tags. -release = u'3.0b0' +release = u'4.0.0' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/doc/source/config.rst b/doc/source/config.rst index 22670f2..583da16 100644 --- a/doc/source/config.rst +++ b/doc/source/config.rst @@ -3,28 +3,90 @@ Configuration ============= -Environment variables (typically not necessary): +Some configuration is possible: -- ``PRISMS_JOBS_DB``: +- ``PRISMS_JOBS_DIR``: (optional, default=``$HOME/.prisms_jobs``) - The SQLite jobs database is stored by default at ``$HOME/.prisms_jobs/jobs.db``. - If ``PRISMS_JOBS_DB`` is set, then the jobs database is stored at - ``$PRISMS_JOBS_DB/jobs.db``. + The jobs database is stored at ``$PBS_JOB_DIR/jobs.db``. -- ``PRISMS_JOBS_SOFTWARE``: +- ``PRISMS_JOBS_DIR/config.json``: - By default, ``prisms-jobs`` will attempt to automatically - detect ``'torque'`` (by checking for the 'qstat' executable) or ``'slurm'`` (by - checking for the 'sbatch' executable). The ``'default'`` module provides stubs to - enable testing/use on systems with no job management software. If ``PRISMS_JOBS_SOFTWARE`` - is set to any other value, it is treated as the name of a Python module containing - a custom interface which ``prisms_jobs`` will attempt to import and use. + Automatically generated JSON configuration file storing settings: + + - ``"dbpath"``: (str) + + The location of the SQLite jobs database. + + - ``"software"``: (str) + + The job submission software interface to use. ``"torque"`` or ``"slurm"`` + is automatically detected if present. + + +-------------------+------------------------------------------------+ + |"torque" |TORQUE | + +-------------------+------------------------------------------------+ + |"slurm" |Slurm | + +-------------------+------------------------------------------------+ + |"default" (or null)|Empty stub, does nothing | + +-------------------+------------------------------------------------+ + |other |The name of an existing findable python module | + | |implementing an interface | + +-------------------+------------------------------------------------+ -- ``PRISMS_JOBS_UPDATE``: + - ``"write_submit_script"``: (bool, optional, default=false) + + If ``true``, submit jobs by first writing a submit script file and then + submitting it. Otherwise, by default, the job is submitted via the command + line. + + - ``"update_method"``: (str, optional, default="default") + + Controls which jobs are updated when JobDB.update() is called. + + +-------------------+------------------------------------------------+ + |"default" (or null)| Select jobs with jobstatus != 'C' | + +-------------------+------------------------------------------------+ + |"check_hostname" | Select jobs with jobstatus != 'C' and matching | + | | hostname. This is useful on compute clusters | + | | where multiple machines with different queues | + | | share the same ``PRISMS_JOBS_DIR``. | + +-------------------+------------------------------------------------+ + + - ``"taskmaster_job_kwargs"``: (JSON object, optional) + + Holds options for the `taskmaster`_ job. Defaults are: + + +-----------+------------------------------------------------+ + |'name' | "taskmaster" | + +-----------+------------------------------------------------+ + |'account' | "prismsprojectdebug_fluxoe" | + +-----------+------------------------------------------------+ + |'nodes' | "1" | + +-----------+------------------------------------------------+ + |'ppn' | "1" | + +-----------+------------------------------------------------+ + |'walltime' | "1:00:00" | + +-----------+------------------------------------------------+ + |'pmem' | "3800mb" | + +-----------+------------------------------------------------+ + |'qos' | "flux" | + +-----------+------------------------------------------------+ + |'queue' | "fluxoe" | + +-----------+------------------------------------------------+ + |'message' | null | + +-----------+------------------------------------------------+ + |'email' | null | + +-----------+------------------------------------------------+ + |'priority' | "-1000" | + +-----------+------------------------------------------------+ + |'command' | "rm taskmaster.o*; rm taskmaster.e*\\n" | + +-----------+------------------------------------------------+ + |'auto' | false | + +-----------+------------------------------------------------+ + + Additionally, the ``'exetime'`` is set based on the ``--delay`` + commandline argument and the commandline invocation used to launch + ``taskmaster`` is appended to ``'command'``. - If unset or set to ``'default'``, the ``pstat`` script - will attempt to update the status of all jobs that are not yet complete (``'C'``). - For systems with multiple-clusters-same-home, this may be set to ``'check_hostname'`` - and ``pstat`` will only attempt to update the status of jobs that are not yet - complete (``'C'``) and have matching hostname, as determined by ``socket.gethostname()``. +.. _taskmaster: scripts/taskmaster.html diff --git a/doc/source/index.rst b/doc/source/index.rst index cba5a04..b42e4d0 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -1,7 +1,4 @@ -.. prisms_jobs documentation master file, created by - sphinx-quickstart on Mon Sep 11 12:22:02 2017. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +.. prisms_jobs Welcome to prisms-jobs's documentation! ======================================= diff --git a/doc/source/install.rst b/doc/source/install.rst index 8da1825..be78e8f 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -45,11 +45,11 @@ Install from source git clone https://github.com/prisms-center/prisms_jobs.git cd prisms_jobs -2. Checkout the branch/tag containing the version you wish to install. Latest is ``v3.0.1``: +2. Checkout the branch/tag containing the version you wish to install. Latest is ``v4.0.0``: :: - git checkout v3.0.1 + git checkout v4.0.0 3. From the root directory of the repository: diff --git a/doc/source/overview.rst b/doc/source/overview.rst index 0b0400e..562ebf1 100644 --- a/doc/source/overview.rst +++ b/doc/source/overview.rst @@ -32,8 +32,8 @@ Possible values for "taskstatus" are: Jobs are marked 'auto' either by submitting through the python class ``prisms_jobs.Job`` -with the attribute ``auto=True``, or by submitting a PBS script which contains -the line ``#auto=True`` using the included ``psub`` script. +with the attribute ``auto=True``, or by submitting a script which contains +the line ``#auto=True`` using the included ``psub`` command line program. Jobs can be monitored using the command line program ``pstat``. All 'auto' jobs which have stopped can be resubmitted using ``pstat --continue``. In this case, @@ -61,7 +61,9 @@ Example screen shot: Additionally, when scheduling periodic jobs is not allowed other ways, the ``taskmaster`` script can fully automate this process. ``taskmaster`` executes -``pstat --continue`` and then resubmits itself to execute again periodically. +``pstat --continue`` and then resubmits itself to execute again periodically. As +not all compute resources allow this behavior, remember check the policy prior +to using ``taskmaster`` on a new compute resource. A script marked 'auto' should check itself for completion and when reached execute ``pstat --complete $JOBID --force`` in bash, or ``prisms_jobs.complete_job()`` diff --git a/doc/source/scripts/pstat.rst b/doc/source/scripts/pstat.rst index a75fa01..24f5a48 100644 --- a/doc/source/scripts/pstat.rst +++ b/doc/source/scripts/pstat.rst @@ -1,12 +1,10 @@ .. scripts/pstat.rst -pstat -===== - - -Summary -------- +``pstat`` +========= +Summary: +-------- ``pstat`` gives command line access to the jobs database. It can be used to: @@ -28,12 +26,11 @@ Summary - Delete jobs from the database (and abort if currently running) -Help documentation: -------------------- +``--help`` documentation: +------------------------- .. argparse:: - :filename: scripts/pstat + :filename: prisms_jobs/scripts/pstat.py :func: make_parser :prog: pstat - - + diff --git a/doc/source/scripts/psub.rst b/doc/source/scripts/psub.rst index 4ae23da..2cb9575 100644 --- a/doc/source/scripts/psub.rst +++ b/doc/source/scripts/psub.rst @@ -1,15 +1,18 @@ .. scripts/psub.rst -psub -==== +``psub`` +======== + +Summary: +-------- ``psub`` submits a job script and adds the job to the job database. -Help documentation: -------------------- +``--help`` documentation: +------------------------- .. argparse:: - :filename: scripts/psub + :filename: prisms_jobs/scripts/psub.py :func: parser :prog: psub diff --git a/doc/source/scripts/taskmaster.rst b/doc/source/scripts/taskmaster.rst index be73a95..c36c291 100644 --- a/doc/source/scripts/taskmaster.rst +++ b/doc/source/scripts/taskmaster.rst @@ -1,26 +1,28 @@ .. scripts/taskmaster.rst -taskmaster -========== +``taskmaster`` +============== -``taskmaster`` submits a job on the PRISMS flux debug queue that will repeatedly -resubmit any ``Auto`` jobs in the job database that have completed but whose -taskstatus is still ``'Incomplete'`` (perhaps because the jobs has hit the walltime -before completing or failed to converge) and then resubmit itself with a delay -before execution. +Summary: +-------- -To use on machines other than flux change the line containing +``taskmaster`` submits a job that will repeatedly resubmit any ``Auto`` jobs in +the job database that have completed but whose taskstatus is still ``'Incomplete'`` +(perhaps because the jobs has hit the walltime before completing or failed to +converge) and then resubmit itself with a delay before execution. As not all +compute resources allow this behavior, remember check the policy prior to using +``taskmaster`` on a new compute resource. -:: +The job submission options can be customized by editing the ``prisms-jobs`` +`configuration file`_. - j = prisms_jobs.templates.PrismsDebugJob(...) -Help documentation: -------------------- +``--help`` documentation: +------------------------- .. argparse:: - :filename: scripts/taskmaster + :filename: prisms_jobs/scripts/taskmaster.py :func: parser :prog: taskmaster - - + +_`configuration file`: config.html diff --git a/prisms_jobs/__init__.py b/prisms_jobs/__init__.py index 72d7dea..8192bf1 100644 --- a/prisms_jobs/__init__.py +++ b/prisms_jobs/__init__.py @@ -1,62 +1,7 @@ """Job submission and management""" -import os -import imp -from distutils.spawn import find_executable -import warnings - -_IMPORT_WARNING_MSG = """\ -prisms_jobs does not detect any job management software -and the 'PRISMS_JOBS_SOFTWARE' environment variable is not set. -""" - -def set_software(user_override=None): - """ - Import interface with job management software as module named ``prisms_jobs.software`` - - Args: - user_override (str, optional, default=None): - - By default, will attempt to automatically detect 'torque' (via 'qstat') - or 'slurm' (via 'sbatch'), or lets user override choice of module via - the input argument or 'PRISMS_JOBS_SOFTWARE' environment variable. The - options are: - - * 'torque' - * 'slurm' - * Anything else will be treated as the name of an existing python module - - Raises: - prisms_jobs.JobDBError: If 'PRISMS_JOBS_SOFTWARE' option is unrecognized - - """ - global software - if user_override is not None: - if user_override.lower() == 'torque': - import prisms_jobs.interface.torque as software - elif user_override.lower() == 'slurm': - import prisms_jobs.interface.slurm as software - elif user_override.lower() == 'default': - import prisms_jobs.interface.default as software - else: - try: - f, filename, description = imp.find_module(user_override) - try: - software = imp.load_module(user_override, f, filename, description) - finally: - if f: - f.close() - except: - raise Exception('Unrecognized PRISMS_JOBS_SOFTWARE: ' + user_override) - elif find_executable("qsub") is not None: - import prisms_jobs.interface.torque as software - elif find_executable("sbatch") is not None: - import prisms_jobs.interface.slurm as software - else: - #warnings.warn(_IMPORT_WARNING_MSG) - import prisms_jobs.interface.default as software class JobsError(Exception): - """ A custom error class for pbs errors """ + """ A custom error class for prisms_jobs errors """ def __init__(self, jobid, msg): self.jobid = jobid self.msg = msg @@ -66,13 +11,10 @@ def __str__(self): return self.jobid + ": " + self.msg # import into 'prisms_jobs' -from job import Job -from jobdb import JobDB, JobDBError, EligibilityError, complete_job, error_job -import templates - -set_software(user_override = os.environ.get('PRISMS_JOBS_SOFTWARE', None)) +from prisms_jobs.job import Job +from prisms_jobs.jobdb import JobDB, JobDBError, EligibilityError, complete_job, error_job -__version__ = '3.0.1' +__version__ = '4.0.0' __all__ = [ 'Job', 'JobDB', @@ -80,5 +22,4 @@ def __str__(self): 'JobDBError', 'EligibilityError', 'complete_job', - 'error_job' - 'set_software'] + 'error_job'] diff --git a/prisms_jobs/config.py b/prisms_jobs/config.py new file mode 100644 index 0000000..0544109 --- /dev/null +++ b/prisms_jobs/config.py @@ -0,0 +1,269 @@ +"""Configuration""" +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * + +import imp +import json +import os +import six +import socket +import warnings +from distutils.spawn import find_executable + +import prisms_jobs + +__settings = None +__software = None +__update_selection_method = None + +_IMPORT_WARNING_MSG = """\ +prisms_jobs does not detect any job management software +and the 'PRISMS_JOBS_SOFTWARE' environment variable is not set. +""" + +def detect_software(): + """ + Detect installed job management software + + Returns: + A string naming the job management software, or None. Possibilities are: + + * 'torque' - detected via 'qsub' + * 'slurm' - detected via 'sbatch' + """ + if find_executable('qsub') is not None: + return 'torque' + elif find_executable('sbatch') is not None: + return 'slurm' + else: + return None + +def set_software(software_name=None): + """ + Import interface with job management software as module named ``prisms_jobs.software`` + + Args: + software_name (str, optional, default=None): + + =================== ======================================================= + 'torque' TORQUE + 'slurm' SLURM + The name of an existing findable python module + None or 'default' Empty stub, does nothing + =================== ======================================================= + + Raises: + prisms_jobs.JobDBError: If software_name is unrecognized + + """ + if software_name is None: + software_name = 'default' + if software_name.lower() == 'default': + import prisms_jobs.interface.default as software + elif software_name.lower() == 'torque': + import prisms_jobs.interface.torque as software + elif software_name.lower() == 'slurm': + import prisms_jobs.interface.slurm as software + else: + try: + f, filename, description = imp.find_module(software_name) + try: + software = imp.load_module(software_name, f, filename, description) + finally: + if f: + f.close() + except: + raise Exception('Unrecognized \'software\': ' + software_name) + global __software + __software = software + +def software(): + """The job management software interface module""" + if __software is None: + configure() + return __software + + +def _default_update_selection(curs): + """Select jobs with jobstatus!='C'""" + curs.execute("SELECT jobid FROM jobs WHERE jobstatus!='C'") + +def _check_hostname_update_selection(curs): + """Select jobs with jobstatus!='C' and matching hostname""" + hostname = socket.gethostname() + + # Parse our hostname so we can only select jobs from THIS host + # Otherwise, if we're on a multiple-clusters-same-home setup, + # we may incorrectly update jobs from one cluster onto the other + m = re.search(r"(.*?)(?=[^a-zA-Z0-9]*login.*)", hostname) #pylint: disable=invalid-name + if m: + hostname_regex = m.group(1) + ".*" + else: + hostname_regex = hostname + ".*" + + curs.execute("SELECT jobid FROM jobs WHERE jobstatus!='C' AND hostname REGEXP ?", + (hostname_regex, )) + + +def set_update_selection_method(update_method=None): + """Enable customization of which jobs are selected for JobDB.update() + + Args: + update_method (str, optional): + + =================== ======================================================= + 'default' (or None) Select jobs with jobstatus != 'C' + 'check_hostname' Select jobs with jobstatus != 'C' and matching hostname + =================== ======================================================= + + Raises: + prisms_jobs.JobsError: For unexpected value. + """ + global __update_selection_method + if update_method is None: + __update_selection_method = _default_update_selection + elif update_method.lower() == 'default': + __update_selection_method = _default_update_selection + elif update_method.lower() == 'check_hostname': + __update_selection_method = _check_hostname_update_selection + else: + raise JobsError('Unrecognized update_method: ' + update_method) + +def update_selection_method(): + """The jobdb update selection method function""" + if __update_selection_method is None: + configure() + return __update_selection_method + + +def config_dir(): + """Return configuration directory""" + return os.environ.get('PRISMS_JOBS_DIR', os.path.join(os.environ['HOME'], '.prisms_jobs')) + +def config_path(dir=None): + """Return configuration file location""" + if dir is None: + dir = config_dir() + return os.path.join(dir, 'config.json') + +def default_settings(dir=None): + """Default configuration dictionary + + Args: + dir (str): Location of the directory storing the config.json file. + + Notes: + See configure for details. + + Returns: + Dict with configuration settings. + """ + if dir is None: + dir = config_dir() + return { + 'dbpath': os.path.join(dir, 'jobs.db'), + 'software': detect_software(), + 'write_submit_script': False, + 'update_method': 'default' + } + +def read_config(dir=None): + """Read configuration file. + + Note: + Will create with default values if not existing. + + Args: + dir (str, optional): Location of the directory storing the config.json file. + The default location is ``$PRISMS_JOBS_DIR/.prisms_jobs``, where + the environment variable ``PRISMS_JOBS_DIR``=``$HOME`` if not set. + + Returns: + Dict with configuration settings. + """ + if dir is None: + dir = config_dir() + if not os.path.exists(dir): + print("Creating config directory:", dir) + os.mkdir(dir) + configpath = config_path(dir) + if not os.path.exists(configpath): + settings = default_settings(dir) + write_config(dir, settings) + return settings + else: + with open(configpath, 'r') as f: + try: + return json.load(f) + except Exception as e: + print("Could not read", configpath + ":") + print("-------------") + print(f.read()) + print("-------------") + print("Delete to restart from defaults\n") + raise e + +def write_config(dir=None, settings=None): + """Write current configuration settings to file. + + Args: + dir (str, optional): Location of the directory storing the config.json file. + The default location is ``$PRISMS_JOBS_DIR/.prisms_jobs``, where + the environment variable ``PRISMS_JOBS_DIR``=``$HOME`` if not set. + settings (dict, optional): Settings to write to config.json file. Uses + current settings by default. + + Returns: + Dict with configuration settings. + """ + if dir is None: + dir = config_dir() + if settings is None: + settings = __settings + with open(config_path(dir), 'w') as f: + # for python2/3 compatibility don't use json.dump: + f.write(json.dumps(settings, indent=2, ensure_ascii=False)) + +def settings(): + """Settings dictionary""" + if __settings is None: + configure() + return __settings + +def dbpath(): + """Settings dictionary""" + if __settings is None: + configure() + return __settings['dbpath'] + +def configure(settings=None): + """Set configuration + + Sets the global configuration settings dictionary. Options are: + + * 'dbpath': (str) + Location of the SQLite jobs database. + * 'software': (str) + Which job management software to use. See set_software for options. + * 'write_submit_script': (bool, default=False) + If true, write submit script to file and then submit job; otherwise + submit via command line. + * 'update_method': (str, default='default') + Controls which jobs are updated when JobDB.update() is called. + See set_update_selection_method for options. + + The values are then used to update: + * software: Module used to interface with job submission software + * update_selection_method: Function used by JobDB.update() + + Args: + config_dict (dict, optional): All configuration settings. Default reads + configuration settings using read_config. + """ + if settings is None: + settings = read_config() + global __settings + __settings = settings + set_software(__settings['software']) + set_update_selection_method(__settings['update_method']) + diff --git a/prisms_jobs/interface/default.py b/prisms_jobs/interface/default.py index e9a9489..cff5724 100644 --- a/prisms_jobs/interface/default.py +++ b/prisms_jobs/interface/default.py @@ -1,5 +1,6 @@ """ Stub to use when running on a machine without job management software """ - +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * ### Required ### diff --git a/prisms_jobs/interface/slurm.py b/prisms_jobs/interface/slurm.py index fff7dd1..4a76a1d 100644 --- a/prisms_jobs/interface/slurm.py +++ b/prisms_jobs/interface/slurm.py @@ -1,19 +1,21 @@ """ Functions for interfacing between slurm and the prisms_jobs module """ #pylint: disable=line-too-long, too-many-locals, too-many-branches +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * ### External ### -import subprocess +import datetime import os -import StringIO import re -import datetime +import subprocess import time -# import sys + +from io import StringIO ### Internal ### from prisms_jobs import JobsError -from prisms_jobs.misc import getlogin, seconds +from prisms_jobs.misc import getlogin, run, seconds def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pylint: disable=unused-argument """Return the stdout of squeue minus the header lines. @@ -23,7 +25,8 @@ def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pyli 'jobid' is a string or list of strings of job ids 'sformat' is a squeue format string (e.g., "%A %i %j %c") - Returns the text of squeue, minus the header lines + Returns: + str: the text of squeue, minus the header lines """ # If Full is true, we need to use scontrol: @@ -34,27 +37,19 @@ def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pyli sopt = ["scontrol", "show", "job"] # Submit the command - p = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = p.communicate() #pylint: disable=unused-variable - - sout = StringIO.StringIO(stdout) - # Nothing to strip, as scontrol provides no headers - return sout.read() + return run(sopt)[0] else: # First, get jobids that belong to that username using # squeue (-h strips the header) sopt = ["squeue", "-h", "-u", username] - q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = q.communicate() #pylint: disable=unused-variable - - qsout = StringIO.StringIO(stdout) + qsout = run(sopt)[0] # Get the jobids jobid = [] - for line in qsout: + for line in StringIO(qsout): jobid += [line.rstrip("\n")] # Great, now we have some jobids to pass along @@ -66,11 +61,7 @@ def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pyli sreturn = "" for my_id in jobid: sopt = opt + [str(my_id)] - - q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = q.communicate() #pylint: disable=unused-variable - - sreturn = sreturn + stdout + "\n" + sreturn = sreturn + run(sopt)[0] + "\n" return sreturn @@ -92,13 +83,7 @@ def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pyli else: sopt += ["-o", "'%i %j %u %M %t %P'"] - q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = q.communicate() #pylint: disable=unused-variable - - sout = StringIO.StringIO(stdout) - - # return the remaining text - return sout.read() + return run(sopt)[0] ### Required ### @@ -158,9 +143,8 @@ def job_id(all=False, name=None): #pylint: disable=redefined-builtin """ if all or name is not None: jobid = [] - stdout = _squeue() - sout = StringIO.StringIO(stdout) - for line in sout: + sout = _squeue() + for line in StringIO(sout): if name is not None: if line.split()[3] == name: jobid.append((line.split()[0]).split(".")[0]) @@ -225,14 +209,22 @@ def job_status(jobid=None): """ status = dict() - stdout = _squeue(jobid=jobid, full=True) - sout = StringIO.StringIO(stdout) - -### TODO: figure out why jobstatus is being initialized as a None vs as a dict() and then checked for content ### pylint: disable=fixme - # jobstatus = None - jobstatus = {"jobid" : None, "name" : None, "nodes" : None, "procs" : None, "walltime" : None, "qstatstr" : None, "elapsedtime" : None, "starttime" : None, "completiontime" : None, "jobstatus" : None, "cluster": None} - - for line in sout: + sout = _squeue(jobid=jobid, full=True) + + jobstatus = { + "jobid" : None, + "name" : None, + "nodes" : None, + "procs" : None, + "walltime" : None, + "qstatstr" : None, + "elapsedtime" : None, + "starttime" : None, + "completiontime" : None, + "jobstatus" : None, + "cluster": None} + + for line in StringIO(sout): # Check for if we're at a new job header line m = re.search(r"JobId=\s*(\S*)\s*", line) #pylint: disable=invalid-name if m: @@ -322,11 +314,15 @@ def job_status(jobid=None): return status -def submit(substr): +def submit(substr, write_submit_script=None): """Submit a job using ``sbatch``. Args: substr (str): The submit script string + write_submit_script (bool, optional): If true, submit via file skipping + lines containing '#SBATCH -J'; otherwise, submit via commandline. If + not specified, uses ``prisms_jobs.config['write_submit_script']``. + Returns: str: ID of submitted job @@ -335,20 +331,35 @@ def submit(substr): JobsError: If a submission error occurs """ - m = re.search(r"-J\s+(.*)\s", substr) #pylint: disable=invalid-name + m = re.search(r"#SBATCH\s+-J\s+(.*)\s", substr) #pylint: disable=invalid-name if m: jobname = m.group(1) #pylint: disable=unused-variable else: raise JobsError( None, - r"Error in prisms_jobs.misc.submit(). Jobname (\"-N\s+(.*)\s\") not found in submit string.") - - p = subprocess.Popen( #pylint: disable=invalid-name - "sbatch", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate(input=substr) #pylint: disable=unused-variable - print stdout[:-1] + r"""Error in pbs.misc.submit(). Jobname ("#SBATCH\s+-J\s+(.*)\s") not found in submit string.""") + + if write_submit_script is None: + write_submit_script = prisms_jobs.config['write_submit_script'] + + if write_submit_script: + if os.path.exists(jobname): + index = 0 + while os.path.exists(jobname + ".bak." + str(index)): + index += 1 + print("Backing up existing submit script:", jobname, "->", jobname + ".bak." + str(index)) + os.rename(jobname, jobname + ".bak." + str(index)) + # write submit script, without -N line + with open(jobname, 'w') as f: + for line in substr.splitlines(): + if not re.search(r"SBATCH\s+-J\s+(.*)", line): + f.write(line + '\n') + stdout, stderr, returncode = run(["sbatch", jobname]) #pylint: disable=unused-variable + else: + stdout, stderr, returncode = run(["sbatch"], input=substr, stdin=subprocess.PIPE) #pylint: disable=unused-variable + print(stdout[:-1]) if re.search("error", stdout): - raise JobsError(0, "prisms_jobs submission error.\n" + stdout + "\n" + stderr) + raise JobsError(0, "Submission error.\n" + stdout + "\n" + stderr) else: jobid = stdout.rstrip().split()[-1] return jobid @@ -363,10 +374,7 @@ def delete(jobid): int: ``scancel`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["scancel", jobid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + return run(["scancel", jobid])[2] def hold(jobid): """``scontrol`` delay a job. @@ -378,10 +386,7 @@ def hold(jobid): int: ``scontrol`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["scontrol", "update", "JobId=", jobid, "StartTime=", "now+30days"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + return run(["scontrol", "update", "JobId=", jobid, "StartTime=", "now+30days"])[2] def release(jobid): """``scontrol`` un-delay a job. @@ -393,10 +398,7 @@ def release(jobid): int: ``scontrol`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["scontrol", "update", "JobId=", jobid, "StartTime=", "now"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + return run(["scontrol", "update", "JobId=", jobid, "StartTime=", "now"])[2] def alter(jobid, arg): """``scontrol`` update job. @@ -408,10 +410,7 @@ def alter(jobid, arg): Returns: int: ``scontrol`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["scontrol", "update", "JobId=", jobid] + arg.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + return run(["scontrol", "update", "JobId=", jobid] + arg.split())[2] def read(job, qsubstr): """Raise exception""" diff --git a/prisms_jobs/interface/torque.py b/prisms_jobs/interface/torque.py index c8a8ba1..d396764 100644 --- a/prisms_jobs/interface/torque.py +++ b/prisms_jobs/interface/torque.py @@ -1,31 +1,35 @@ """ Misc functions for interfacing between torque and the prisms_jobs module """ +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * -import subprocess +import datetime import os -import StringIO import re -import datetime -import time +import subprocess import sys -from prisms_jobs import JobsError -from prisms_jobs.misc import getlogin, seconds +import time + from distutils.spawn import find_executable +from io import StringIO +from six import iteritems, string_types + +from prisms_jobs import JobsError +from prisms_jobs.misc import getlogin, run, seconds ### Internal ### + def _getversion(): - """Returns the torque version or 0. if no ``qstat`` """ + """Returns the torque version as string or None if no ``qstat`` """ if find_executable("qstat") is None: - return 0. + return None opt = ["qstat", "--version"] # call 'qstat' using subprocess - p = subprocess.Popen(opt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = p.communicate() #pylint: disable=unused-variable - sout = StringIO.StringIO(stdout) - + stdout = run(opt)[0] + # return the version number - return float(sout.read().rstrip("\n").lower().lstrip("version: ").split(".")[0]) + return stdout.rstrip("\n").lower().lstrip("version: ") torque_version = _getversion() @@ -37,24 +41,22 @@ def _qstat(jobid=None, username=getlogin(), full=False): 'full' is the '-f' option 'jobid' is a string or list of strings of job ids - Returns the text of qstat, minus the header lines + Returns: + str: the text of qstat, minus the header lines """ # -u and -f contradict in earlier versions of Torque - if full and username is not None and (torque_version < 5.0 and jobid is None): + if full and username is not None and int(torque_version.split(".")[0]) < 5 and jobid is None: # First get all jobs by the user qopt = ["qselect"] qopt += ["-u", username] # Call 'qselect' using subprocess - q = subprocess.Popen(qopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = q.communicate() #pylint: disable=unused-variable - - qsout = StringIO.StringIO(stdout) + stdout = run(qopt)[0] # Get the jobids jobid = [] - for line in qsout: + for line in StringIO(stdout): jobid += [line.rstrip("\n")] opt = ["qstat"] @@ -68,22 +70,18 @@ def _qstat(jobid=None, username=getlogin(), full=False): if full: opt += ["-f"] if jobid is not None: - if isinstance(jobid, str) or isinstance(jobid, unicode): + if isinstance(jobid, string_types): jobid = [jobid] elif isinstance(jobid, list): pass else: - print "Error in prisms_jobs.interface.torque.qstat(). type(jobid):", type(jobid) + print("Error in prisms_jobs.interface.torque.qstat(). type(jobid):", type(jobid)) sys.exit() opt += jobid # call 'qstat' using subprocess - # print opt - p = subprocess.Popen(opt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name - stdout, stderr = p.communicate() #pylint: disable=unused-variable - - sout = StringIO.StringIO(stdout) - + stdout, stderr, returncode = run(opt) #pylint: disable=unused-variable + sout = StringIO(stdout) # strip the header lines if full is False: for line in sout: @@ -149,8 +147,7 @@ def job_id(all=False, name=None): #pylint: disable=redefined-builtin if all or name is not None: jobid = [] stdout = _qstat() - sout = StringIO.StringIO(stdout) - for line in sout: + for line in StringIO(stdout): if name is not None: if line.split()[3] == name: jobid.append((line.split()[0]).split(".")[0]) @@ -215,13 +212,10 @@ def job_status(jobid=None): """ status = dict() - stdout = _qstat(jobid=jobid, full=True) - sout = StringIO.StringIO(stdout) - -### TODO: figure out why jobstatus is being initialized as a None vs as a dict() and then checked for content ### pylint: disable=fixme + sout = _qstat(jobid=jobid, full=True) jobstatus = None - for line in sout: + for line in StringIO(sout): m = re.search(r"Job Id:\s*(.*)\s", line) #pylint: disable=invalid-name if m: @@ -307,11 +301,14 @@ def job_status(jobid=None): return status -def submit(substr): +def submit(substr, write_submit_script=False): """Submit a job using ``qsub``. Args: substr (str): The submit script string + write_submit_script (bool, optional): If true, submit via file skipping + lines containing '#PBS -N'; otherwise, submit via commandline. If + not specified, uses ``prisms_jobs.config['write_submit_script']``. Returns: str: ID of submitted job @@ -320,20 +317,35 @@ def submit(substr): JobsError: If a submission error occurs """ - m = re.search(r"-N\s+(.*)\s", substr) #pylint: disable=invalid-name + m = re.search(r"#PBS\s+-N\s+(.*)\s", substr) #pylint: disable=invalid-name if m: jobname = m.group(1) #pylint: disable=unused-variable else: raise JobsError( None, - r"Error in prisms_jobs.misc.submit(). Jobname (\"-N\s+(.*)\s\") not found in submit string.") + r"""Error in pbs.misc.submit(). Jobname ("#PBS\s+-N\s+(.*)\s") not found in submit string.""") - p = subprocess.Popen( #pylint: disable=invalid-name - "qsub", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate(input=substr) #pylint: disable=unused-variable - print stdout[:-1] + if write_submit_script is None: + write_submit_script = prisms_jobs.config['write_submit_script'] + + if write_submit_script: + if os.path.exists(jobname): + index = 0 + while os.path.exists(jobname + ".bak." + str(index)): + index += 1 + print("Backing up existing submit script:", jobname, "->", jobname + ".bak." + str(index)) + os.rename(jobname, jobname + ".bak." + str(index)) + # write submit script, without -N line + with open(jobname, 'w') as f: + for line in substr.splitlines(): + if not re.search(r"#PBS\s+-N\s+(.*)", line): + f.write(line + '\n') + stdout, stderr, returncode = run(["qsub", jobname]) #pylint: disable=unused-variable + else: + stdout, stderr, returncode = run(["qsub"], input=substr, stdin=subprocess.PIPE) #pylint: disable=unused-variable + print(stdout[:-1]) if re.search("error", stdout): - raise JobsError(0, "prisms_jobs submission error.\n" + stdout + "\n" + stderr) + raise JobsError(0, "Submission error.\n" + stdout + "\n" + stderr) else: jobid = stdout.split(".")[0] return jobid @@ -348,10 +360,8 @@ def delete(jobid): int: ``qdel`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["qdel", jobid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + stdout, stderr, returncode = run(["qdel", jobid]) #pylint: disable=unused-variable + return returncode def hold(jobid): """``qhold`` a job. @@ -363,10 +373,8 @@ def hold(jobid): int: ``qhold`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["qhold", jobid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + stdout, stderr, returncode = run(["qhold", jobid]) #pylint: disable=unused-variable + return returncode def release(jobid): """``qrls`` a job. @@ -378,10 +386,8 @@ def release(jobid): int: ``qrls`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["qrls", jobid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + stdout, stderr, returncode = run(["qrls", jobid]) #pylint: disable=unused-variable + return returncode def alter(jobid, arg): """``qalter`` a job. @@ -393,10 +399,8 @@ def alter(jobid, arg): Returns: int: ``qalter`` returncode """ - p = subprocess.Popen( #pylint: disable=invalid-name - ["qalter"] + arg.split() + [jobid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = p.communicate() #pylint: disable=unused-variable - return p.returncode + stdout, stderr, returncode = run(["qalter"] + arg.split() + [jobid]) #pylint: disable=unused-variable + return returncode def read(job, qsubstr): #pylint: disable=too-many-branches, too-many-statements """ @@ -414,7 +418,7 @@ def read(job, qsubstr): #pylint: disable=too-many-branches, too-many-statemen qsubstr (str): A submit script as a string """ - s = StringIO.StringIO(qsubstr) #pylint: disable=invalid-name + s = StringIO(qsubstr) #pylint: disable=invalid-name job.pmem = None job.email = None @@ -520,7 +524,7 @@ def read(job, qsubstr): #pylint: disable=too-many-branches, too-many-statemen job.auto = True optional["auto"] = job.auto else: - print "Error in prisms_jobs.Job().read(). '#auto=' argument not understood:", line + print("Error in prisms_jobs.Job().read(). '#auto=' argument not understood:", line) sys.exit() m = re.search(r"cd\s+\$PBS_O_WORKDIR\s+", line) #pylint: disable=invalid-name @@ -532,24 +536,24 @@ def read(job, qsubstr): #pylint: disable=too-many-branches, too-many-statemen # end for # check for required arguments - for k in required.keys(): + for k in required: if required[k] == "Not Found": - print "Error in prisms_jobs.Job.read(). Not all required arguments were found.\n" + print("Error in prisms_jobs.Job.read(). Not all required arguments were found.\n") # print what we found: - print "Optional arguments:" - for k, v in optional.iteritems(): #pylint: disable=invalid-name - print k + ":", v - print "\nRequired arguments:" - for k, v in required.iteritems(): #pylint: disable=invalid-name + print("Optional arguments:") + for k, v in iteritems(optional): #pylint: disable=invalid-name + print(k + ":", v) + print("\nRequired arguments:") + for k, v in iteritems(required): #pylint: disable=invalid-name if k == "command": - print k + ":" - print "--- Begin command ---" - print v - print "--- End command ---" + print(k + ":") + print("--- Begin command ---") + print(v) + print("--- End command ---") else: - print k + ":", v + print(k + ":", v) sys.exit() # end if diff --git a/prisms_jobs/job.py b/prisms_jobs/job.py index 109c720..e5575fb 100644 --- a/prisms_jobs/job.py +++ b/prisms_jobs/job.py @@ -1,14 +1,16 @@ """ Class for individual Job objects """ +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * + ### External ### # import subprocess -import re import os +import re import sys ### Local ### import prisms_jobs -import jobdb -import misc +from prisms_jobs import config, jobdb, misc class Job(object): #pylint: disable=too-many-instance-attributes """Represents a computational job @@ -170,8 +172,8 @@ def __init__(self, name="STDIN", account=None, nodes=None, ppn=None, walltime=No # def sub_string(self): #pylint: disable=too-many-branches - """ Output Job as a string suitable for prisms_jobs.software """ - return prisms_jobs.software.sub_string(self) + """ Output Job as a string suitable for prisms_jobs.config.software() """ + return config.software().sub_string(self) def script(self, filename="submit.sh"): """ @@ -186,7 +188,7 @@ def script(self, filename="submit.sh"): def submit(self, add=True, dbpath=None): """ - Submit this Job using the appropriate command for prisms_jobs.software. + Submit this Job using the appropriate command for prisms_jobs.config.software(). Args: add (bool): Should this job be added to the JobDB database? @@ -197,7 +199,7 @@ def submit(self, add=True, dbpath=None): """ - self.jobID = prisms_jobs.software.submit(substr=self.sub_string()) + self.jobID = config.software().submit(substr=self.sub_string()) if add: db = jobdb.JobDB(dbpath=dbpath) #pylint: disable=invalid-name @@ -213,12 +215,12 @@ def submit(self, add=True, dbpath=None): def read(self, qsubstr): #pylint: disable=too-many-branches, too-many-statements """ Set this Job object from string representing a submit script appropriate - for the prisms_jobs.software. + for the config.software(). Args: qsubstr (str): A submit script as a string """ - prisms_jobs.software.read(self, qsubstr) + config.software().read(self, qsubstr) diff --git a/prisms_jobs/jobdb.py b/prisms_jobs/jobdb.py index 7d95057..df1f171 100644 --- a/prisms_jobs/jobdb.py +++ b/prisms_jobs/jobdb.py @@ -1,17 +1,24 @@ """ JobDB class and associated functions and methods """ #pylint: disable=too-many-lines +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * -import sqlite3 +import json import os -import sys +import re import socket +import sqlite3 +import sys import time -import re -import json import warnings +from six import iteritems, string_types + import prisms_jobs -import prisms_jobs.misc as misc +from prisms_jobs import config, misc + +def trunc(data, maxlen): + return (data[:maxlen-2] + '..') if len(data) > maxlen else data class JobDBError(Exception): """ Custom error class for JobDB""" @@ -34,51 +41,6 @@ def __str__(self): return self.jobid + ": " + self.msg -def _default_update_selection(curs): - """Select jobs with jobstatus!='C'""" - curs.execute("SELECT jobid FROM jobs WHERE jobstatus!='C'") - -def _check_hostname_update_selection(curs): - """Select jobs with jobstatus!='C' and matching hostname""" - hostname = socket.gethostname() - - # Parse our hostname so we can only select jobs from THIS host - # Otherwise, if we're on a multiple-clusters-same-home setup, - # we may incorrectly update jobs from one cluster onto the other - m = re.search(r"(.*?)(?=[^a-zA-Z0-9]*login.*)", hostname) #pylint: disable=invalid-name - if m: - hostname_regex = m.group(1) + ".*" - else: - hostname_regex = hostname + ".*" - - curs.execute("SELECT jobid FROM jobs WHERE jobstatus!='C' AND hostname REGEXP ?", - (hostname_regex, )) - -def set_update_selection_method(user_override=None): - """Enable customization of which jobs are selected for JobDB.update() - - Args: - user_override (str, optional): - - =================== ======================================================= - 'default' (or None) Select jobs with jobstatus != 'C' - 'check_hostname' Select jobs with jobstatus != 'C' and matching hostname - =================== ======================================================= - - Raises: - prisms_jobs.JobDBError: For unexpected value. - """ - global update_selection_method - if user_override is None or user_override.lower() == 'default': - update_selection_method = _default_update_selection - elif user_override.lower() == 'check_hostname': - update_selection_method = _check_hostname_update_selection - else: - raise JobDBError('Unrecognized PRISMS_JOBS_UPDATE: ' + user_override) - -set_update_selection_method(user_override = os.environ.get('PRISMS_JOBS_UPDATE', None)) - - # columns in database (see job_status_dict()): # username, hostname, jobid, jobname, rundir, jobstatus, auto, taskstatus, # continuation_jobid, qsubstr, qstatstr, nodes, proc, walltime, starttime, @@ -164,7 +126,7 @@ def job_status_type_dict(): It is used to create the JobDB SQL table. """ status = job_status_dict() - for k in status.keys(): + for k in status: status[k] = "text" status["auto"] = "integer" @@ -186,7 +148,7 @@ def sql_create_str(): """Returns a string for SQL CREATE TABLE""" status_type = job_status_type_dict() s = "(" #pylint: disable=invalid-name - for k in status_type.keys(): + for k in status_type: s += k + " " + status_type[k] + ", " #pylint: disable=invalid-name return s[:-2] + ")" @@ -197,7 +159,7 @@ def sql_insert_str(job_status): colstr = "(" questionstr = "(" val = [] - for k in job_status.keys(): + for k in job_status: colstr = colstr + k + ", " questionstr = questionstr + "?, " val.append(job_status[k]) @@ -206,6 +168,22 @@ def sql_insert_str(job_status): return colstr, questionstr, tuple(val) +class CompatibilityRow(object): + """Python2/3 compatibility wrapper of sqlite3.Row""" + def __init__(self, row): + self._row = row + + def __getitem__(self, key): + if sys.version_info < (3,) and isinstance(key, unicode): + key = key.encode('utf-8') + return self._row[key] + + def keys(self): + return self._row.keys() + + def __str__(self): + return str(self._row) + def sql_iter(curs, arraysize=1000): """ Iterate over the results of a SELECT statement """ while True: @@ -214,7 +192,7 @@ def sql_iter(curs, arraysize=1000): break else: for r in records: #pylint: disable=invalid-name - yield r + yield CompatibilityRow(r) def regexp(pattern, string): @@ -224,18 +202,13 @@ def regexp(pattern, string): class JobDB(object): #pylint: disable=too-many-instance-attributes, too-many-public-methods """A primsms_jobs Job Database object + Usually this is called without arguments (prisms_jobs.JobDB()) to open or + create a database in the default location. + Args: - dbpath (str, optional): Path to JobDB sqlite database. + dbpath (str, optional): Path to JobDB sqlite database. By default, + uses ``prisms_jobs.config.dbpath()``. - Usually this is called without arguments (prisms_jobs.JobDB()) to open or create a - database in the default location. - - If dbpath is not given: - * If the ``PRISMS_JOBS_DB`` environment variable exists, set dbpath to - ``$PRISMS_JOBS_DB/jobs.db`` - * Else, set dbpath to "$HOME/.prisms_jobs/jobs.db", where $HOME is the user's - home directory - """ def __init__(self, dbpath=None): @@ -253,36 +226,16 @@ def connect(self, dbpath=None): #pylint: disable=too-many-branches, too-many- """Open a connection to the jobs database. Args: - dbpath (str): path to a JobDB database file. - - If dbpath is not given: - * If PRISMS_JOBS_DB environment variable exists, set dbpath to "$PRISMS_JOBS_DB/jobs.db" file. - * Else, set dbpath to "$HOME/.prisms_jobs/jobs.db", where $HOME is the user's home directory + dbpath (str, optional): path to a JobDB database file. By default, + uses ``prisms_jobs.config.dbpath()``. """ if dbpath is None: - if "PRISMS_JOBS_DB" in os.environ: - dbpath = os.environ("PRISMS_JOBS_DB") - if not os.path.isdir(dbpath): - print "Error in prisms_jobs.jobdb.JobDB.connect()." - print " PRISMS_JOBS_DB:", dbpath - print " Does not exist" - sys.exit() - else: - dbpath = os.path.join(os.environ["HOME"], ".prisms_jobs") - if not os.path.isdir(dbpath): - print "Creating directory:", dbpath - os.mkdir(dbpath) - dbpath = os.path.join(dbpath, "jobs.db") - else: - if not os.path.isfile(dbpath): - print ("Error in prisms_jobs.jobdb.JobDB.connect(). argument dbpath =", - dbpath, "is not a file.") - sys.exit() + dbpath = config.dbpath() if not os.path.isfile(dbpath): - print "Creating Database:", dbpath + print("Creating Database:", dbpath) self.conn = sqlite3.connect(dbpath) self.conn.row_factory = sqlite3.Row self.conn.create_function("REGEXP", 2, regexp) @@ -335,7 +288,7 @@ def update(self): # update jobstatus # * this method can be configured/customized via set_update_selection_method - update_selection_method(self.curs) + config.update_selection_method()(self.curs) # newstatus will contain the updated info newstatus = dict() @@ -345,13 +298,13 @@ def update(self): newstatus[f["jobid"]] = "C" # get job_status dict for all jobs found with qstat - active_status = prisms_jobs.software.job_status() + active_status = config.software().job_status() # reset untracked self.untracked = [] # collect job status - for k in active_status.keys(): + for k in active_status: if k in newstatus: newstatus[k] = active_status[k] else: @@ -360,7 +313,7 @@ def update(self): self.untracked.append(active_status[k]) # update database with latest job status - for key, jobstatus in newstatus.iteritems(): + for key, jobstatus in iteritems(newstatus): if jobstatus == "C": self.curs.execute( "UPDATE jobs SET jobstatus=?, elapsedtime=?, modifytime=? WHERE jobid=?", @@ -390,8 +343,8 @@ def update(self): def select_job(self, jobid): """Return record (sqlite3.Row object) for one job with given jobid.""" - if not isinstance(jobid, str) and not isinstance(jobid, unicode): - print "Error in prisms_jobs.JobDB.select_job(). type(id):", type(jobid), "expected str." + if not isinstance(jobid, string_types): + print("Error in prisms_jobs.JobDB.select_job(). type(id):", type(jobid), "expected str.") sys.exit() self.curs.execute("SELECT * FROM jobs WHERE jobid=?", (jobid,)) @@ -403,7 +356,7 @@ def select_job(self, jobid): raise JobDBError("Error in prisms_jobs.JobDB.select_job(). " + str(len(r)) + " records with jobid: '" + jobid + "' found.") - return r[0] + return CompatibilityRow(r[0]) def select_series(self, jobid): @@ -426,8 +379,8 @@ def select_parent(self, jobid): The parent is the job with continuation_jobid = given jobid """ - if not isinstance(jobid, str) and not isinstance(jobid, unicode): - print "Error in prisms_jobs.JobDB.select_parent(). type(id):", type(jobid), "expected str." + if not isinstance(jobid, string_types): + print("Error in prisms_jobs.JobDB.select_parent(). type(id):", type(jobid), "expected str.") sys.exit() self.curs.execute("SELECT * FROM jobs WHERE continuation_jobid=?", (jobid,)) @@ -440,7 +393,7 @@ def select_parent(self, jobid): " records with continuation_jobid:", jobid, " found.") sys.exit() - return r[0] + return CompatibilityRow(r[0]) def select_child(self, jobid): @@ -465,7 +418,7 @@ def select_child(self, jobid): len(r), " records with child jobid:", r["continuation_jobid"], " found.") sys.exit() - return r[0] + return CompatibilityRow(r[0]) @@ -659,7 +612,7 @@ def continue_job(self, jobid=None, job=None): wd = os.getcwd() #pylint: disable=invalid-name os.chdir(job["rundir"]) - new_jobid = prisms_jobs.software.submit(substr=job["qsubstr"]) + new_jobid = config.software().submit(substr=job["qsubstr"]) self.curs.execute("UPDATE jobs SET taskstatus='Continued', modifytime=?,\ continuation_jobid=? WHERE jobid=?", @@ -723,7 +676,7 @@ def abort_job(self, jobid=None, job=None): if not eligible: raise EligibilityError(id, msg) - prisms_jobs.software.delete(job["jobid"]) + config.software().delete(job["jobid"]) self.curs.execute("UPDATE jobs SET taskstatus='Aborted', modifytime=?\ WHERE jobid=?", (int(time.time()), job["jobid"])) self.conn.commit() @@ -761,7 +714,7 @@ def delete_job(self, jobid=None, job=None, series=False): jobseries = [job["jobid"]] for j in jobseries: - prisms_jobs.software.delete(j) + config.software().delete(j) self.curs.execute("DELETE from jobs WHERE jobid=?", (j, )) self.conn.commit() @@ -916,8 +869,8 @@ def _print_record(self, r): #pylint: disable=invalid-name, no-self-use d[k] = misc.strftimedelta(d[k]) print ("{0:<12} {1:<24} {2:^5} {3:^5} {4:>12} {5:^1} {6:>12} {7:<24} {8:^1} {9:<12}" - .format(d["jobid"], d["jobname"], d["nodes"], d["procs"], d["walltime"], - d["jobstatus"], d["elapsedtime"], d["taskstatus"], d["auto"], + .format(d["jobid"], trunc(d["jobname"],24), d["nodes"], d["procs"], d["walltime"], + d["jobstatus"], d["elapsedtime"], trunc(d["taskstatus"],24), d["auto"], d["continuation_jobid"])) @@ -927,16 +880,16 @@ def _print_full_record(self, r): #pylint: disable=invalid-name, no-self-use Args: r (dict): a dict-like object """ - print "#Record:" - for key in r.keys(): - if isinstance(r[key], (str, unicode)): + print("#Record:") + for key in r: + if isinstance(r[key], string_types): s = "\"" + r[key] + "\"" #pylint: disable=invalid-name if re.search("\n", s): s = "\"\"" + s + "\"\"" #pylint: disable=invalid-name - print key, "=", s + print(key, "=", s) else: - print key, "=", r[key] - print "" + print(key, "=", r[key]) + print("") def print_job(self, jobid=None, job=None, full=False, series=False): @@ -961,7 +914,7 @@ def print_job(self, jobid=None, job=None, full=False, series=False): else: for r in series: #pylint: disable=invalid-name self._print_record(r) - print "" + print("") else: if job is None: job = self.select_job(jobid) @@ -1011,7 +964,7 @@ def print_untracked(self, full=False): full (bool): If True, print as key:val pair list, If (default) False, print single row summary in 'qstat' style. """ - print "Untracked:" + print("Untracked:") if not full: self.print_header() sort = sorted(self.untracked, key=lambda rec: rec["jobid"]) @@ -1035,7 +988,7 @@ def print_all(self, full=False, series=False): series (bool): If True, print records as groups of auto submitting job series. If (default) False, print in order found. """ - print "Tracked:" + print("Tracked:") self.curs.execute("SELECT * FROM jobs") if not full: self.print_header() @@ -1053,7 +1006,7 @@ def print_active(self, full=False, series=False): series (bool): If True, print records as groups of auto submitting job series. If (default) False, print in order found. """ - print "Tracked:" + print("Tracked:") self.curs.execute("SELECT * FROM jobs WHERE taskstatus!='Complete'\ AND taskstatus!='Aborted' AND taskstatus!='Continued'") if not full: @@ -1081,7 +1034,7 @@ def complete_job(jobid=None, dbpath=None,): db = JobDB(dbpath) #pylint: disable=invalid-name if jobid is None: - jobid = prisms_jobs.software.job_id() + jobid = config.software().job_id() if jobid is None: raise prisms_jobs.JobsError(0, "Could not determine jobid") @@ -1109,7 +1062,7 @@ def error_job(message, jobid=None, dbpath=None): """ db = JobDB(dbpath) #pylint: disable=invalid-name if jobid is None: - jobid = prisms_jobs.software.job_id() + jobid = config.software().job_id() if jobid is None: raise prisms_jobs.JobsError(0, "Could not determine jobid") diff --git a/prisms_jobs/misc.py b/prisms_jobs/misc.py index 9db822d..359a561 100644 --- a/prisms_jobs/misc.py +++ b/prisms_jobs/misc.py @@ -1,11 +1,63 @@ -""" Misc functions for interacting between the OS and the pbs module """ +""" Misc functions for interacting between the OS and the prisms_jobs """ +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * -import subprocess -import os -import StringIO import datetime +import os +import subprocess import sys +def _set_encoding(encoding=None): + if encoding is None: + if sys.stdout.encoding is not None: + return sys.stdout.encoding + else: + return 'utf-8' + else: + return encoding + +def _decode(val, encoding=None): + try: + if isinstance(val, bytes): + return val.decode(_set_encoding(encoding)) + else: + return val + except Exception as e: + print("Exception in prisms_jobs.misc._decode:", e) + print("val:", val) + print("sys.stdout.encoding:", sys.stdout.encoding) + raise e + +def run(cmd, input=None, stdin=None, encoding=None): + """Run subprocess and return stdout, stderr as text, returncode as int + + Args: + cmd (List[str]): Command to run as subprocess + input (str): Data to be sent to child process + stdin (stream): Use subprocess.PIPE to pass data via stdin + encoding (str, optional): Encoding to use to decode stdout, stderr. By + default, uses sys.stdout.encoding if available, else 'utf-8'. + + Returns: + (stdout, stderr, returncode): With stdout and stderr as strings, and + returncode as int + """ + try: + p = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + encoding = _set_encoding(encoding) + if input is not None: + input = bytearray(input, encoding=encoding) + stdout, stderr = p.communicate(input=input) + return (_decode(stdout, encoding), _decode(stderr, encoding), p.returncode) + except Exception as e: + print("Exception in prisms_jobs.misc.run:", e) + print("cmd:", cmd) + print("input:", input) + print("stdin:", stdin) + print("encoding:", encoding) + print("sys.stdout.encoding:", sys.stdout.encoding) + raise e + def getlogin(): """Returns os.getlogin(), else os.environ["LOGNAME"], else "?" """ try: @@ -30,7 +82,7 @@ def seconds(walltime): + float(wtime[1])*60.0 + float(wtime[2])) else: - print "Error in walltime format:", walltime + print("Error in walltime format:", walltime) sys.exit() def hours(walltime): @@ -48,7 +100,7 @@ def hours(walltime): + float(wtime[1])/60.0 + float(wtime[2])/3600.0) else: - print "Error in walltime format:", walltime + print("Error in walltime format:", walltime) sys.exit() def strftimedelta(seconds): #pylint: disable=redefined-outer-name diff --git a/prisms_jobs/scripts/__init__.py b/prisms_jobs/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/pstat b/prisms_jobs/scripts/pstat.py old mode 100755 new mode 100644 similarity index 85% rename from scripts/pstat rename to prisms_jobs/scripts/pstat.py index 28c2a5d..2297476 --- a/scripts/pstat +++ b/prisms_jobs/scripts/pstat.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python """Print or modify PRISMS_JOBS job and task status.""" +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * ### External ### # import sys @@ -12,7 +13,7 @@ DESC = \ """ -Print or modify PRISMS_JOBS job and task status. +Print or modify `prisms-jobs` job and task status. By default, 'pstat' prints status for select jobs. Jobs are @@ -28,33 +29,34 @@ unless the --force option is given. -Job status is as given by PRISMS_JOBS for a single job ('C', 'R', +Job status is as given by `prisms-jobs` for a single job ('C', 'R', 'Q', etc.). Task status is user-defined and defines the status of a single job within a possible series of jobs comprising some task. 'Auto' jobs may be re-submitted with the --continue option. -Please see: - https://github.com/prisms-center/prisms_jobs -for more information about 'auto' jobs. -Possible values for task status are: - - "Complete": Job and task are complete. - - "Incomplete": Job or task are incomplete. +Jobs are marked 'auto' either by submitting through the python +class ``prisms_jobs.Job`` with the attribute ``auto=True``, or +by submitting a script which contains the line ``#auto=True`` +via ``psub``. - "Continued": Job is complete, but task was not complete. In - this case, 'continuation_jobid' is set with - the jobid for the next job in the series of - jobs comprising some task. - - "Check": Non-auto job is complete and requires user - input for status. - - "Error:.*": Some kind of error was noted. +Possible values for task status are: - "Aborted": The job and task have been aborted. ++------------+------------------------------------------------+ +|"Complete" |Job and task are complete. | ++------------+------------------------------------------------+ +|"Incomplete"|Job or task are incomplete. | ++------------+------------------------------------------------+ +|"Continued" |Job is complete, but task was not complete. | ++------------+------------------------------------------------+ +|"Check" |Non-auto job is complete and requires user | +| |input for status. | ++------------+------------------------------------------------+ +|"Error:.*" |Some kind of error was noted. | ++------------+------------------------------------------------+ +|"Aborted" |The job and task have been aborted. | ++------------+------------------------------------------------+ """ def make_parser(): @@ -107,7 +109,7 @@ def make_parser(): def main(): #pylint: disable=missing-docstring, too-many-statements -# functions + # functions def select_job(args): #pylint: disable=redefined-outer-name """ Select which jobs to operate on """ @@ -172,13 +174,13 @@ def operate(args, check_eligibility, operation, summary_msg, prompt_msg, action_ if eligible: job.append(selected_job) else: - print id + ":", msg + print(id + ":", msg) except prisms_jobs.JobDBError as e: #pylint: disable=invalid-name - print e + print(e) # print jobs to operate on: - print summary_msg + print(summary_msg) db.print_header() @@ -191,12 +193,12 @@ def operate(args, check_eligibility, operation, summary_msg, prompt_msg, action_ else: # prompt user for confirmation while answer != "yes" and answer != "no": - answer = raw_input(prompt_msg) + answer = input(prompt_msg) # perform operation if answer == "yes" and job != []: # or args.select: for j in job: - print action_msg, j["jobid"] + print(action_msg, j["jobid"]) operation(job=j) @@ -222,17 +224,17 @@ def print_data(args): for s in series: #pylint: disable=invalid-name try: job = db.select_job(s) - print s, job[args.key[0]] + print(s, job[args.key[0]]) except prisms_jobs.JobDBError as e: #pylint: disable=invalid-name - print e - print "" + print(e) + print("") else: for j in jobid: try: job = db.select_job(j) - print j, job[args.key[0]] + print(j, job[args.key[0]]) except prisms_jobs.JobDBError as e: #pylint: disable=invalid-name - print e + print(e) def print_jobs(args): @@ -241,14 +243,14 @@ def print_jobs(args): # 'pstat --all' case # show all and untracked db.print_all(full=args.full, series=args.series) - print '\n' + print('\n') db.print_untracked(full=args.full) elif (not args.all and not args.range and not args.recent and not args.regex and args.job == []): # default 'pstat' case with no selection # show active and untracked db.print_active(full=args.full, series=args.series) - print '\n' + print('\n') db.print_untracked(full=args.full) else: # user defined selection (don't show untracked) @@ -268,7 +270,7 @@ def print_jobs(args): try: db.print_job(jobid=j, full=args.full, series=args.series) except prisms_jobs.JobDBError as e: #pylint: disable=invalid-name - print e + print(e) parser = make_parser() @@ -276,12 +278,12 @@ def print_jobs(args): -# open the Job database + # open the Job database db = prisms_jobs.JobDB() #pylint: disable=invalid-name db.update() -# perform an operation, or print jobs + # perform an operation, or print jobs if args.complete: operate(args, \ db.eligible_to_complete, \ @@ -329,7 +331,7 @@ def print_jobs(args): else: print_jobs(args) -# close the database + # close the database db.close() if __name__ == "__main__": diff --git a/scripts/psub b/prisms_jobs/scripts/psub.py old mode 100755 new mode 100644 similarity index 68% rename from scripts/psub rename to prisms_jobs/scripts/psub.py index 9e73c5d..35fd58a --- a/scripts/psub +++ b/prisms_jobs/scripts/psub.py @@ -1,4 +1,6 @@ -#!/usr/bin/env python +"""Submit jobs and store in the jobs database""" +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * # This script submits a PBS script, as with 'qsub script.sh' # and adds the job to the prisms_jobs.JobDB job database @@ -8,12 +10,15 @@ import prisms_jobs -parser = argparse.ArgumentParser(description='Submit a script and add to prisms_jobs database') +parser = argparse.ArgumentParser(description='Submit a script and add to `prisms-jobs` database') parser.add_argument('scriptname', type=str, help='Submit script') -if __name__ == "__main__": +def main(): args = parser.parse_args() substr = open(args.scriptname, 'r').read() job = prisms_jobs.Job(substr=substr) job.submit() + +if __name__ == "__main__": + main() diff --git a/prisms_jobs/scripts/taskmaster.py b/prisms_jobs/scripts/taskmaster.py new file mode 100644 index 0000000..704923a --- /dev/null +++ b/prisms_jobs/scripts/taskmaster.py @@ -0,0 +1,118 @@ +"""Automatically resubmit jobs""" +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * + +import argparse +import sys +import subprocess +from six import iteritems + +import prisms_jobs +from prisms_jobs import config +software = config.software() + +def check_for_other(): + jobid = software.job_id(name="taskmaster") + if not len(jobid): + return + tmaster_status = software.job_status(jobid) + for j in jobid: + if j != software.job_id() and tmaster_status[j]["jobstatus"] != "C": + print("A taskmaster is already running. JobID:", j, " Status:", tmaster_status[j]["jobstatus"]) + sys.exit() + +DESC = \ +""" +Automatically resubmit jobs. + +'taskmaster' submits itself with instructions to be run after an amount of time +specified by --delay (default=15:00). When it runs, it continues all auto +prisms_jobs jobs in the database that are incomplete and then re-submits itself +to execute again after the specified delay. + +The specifics of 'taskmaster' submission can be customized by editing the +'taskmaster_job_kwargs' object in the prisms_jobs configuration file: +``$PRISMS_JOBS_DIR/config.json``. +""" + +parser = argparse.ArgumentParser(description=DESC, formatter_class=argparse.RawTextHelpFormatter) +parser.add_argument('-d','--delay', type=str, default="15:00", \ + help='How long to delay ("[[[DD:]HH:]MM:]SS") between executions. Default is "15:00".') + +group = parser.add_mutually_exclusive_group() +group.add_argument('--hold', action='store_true', help='Place a hold on the currently running taskmaster') +group.add_argument('--release', action='store_true', help='Release the currently running taskmaster') +group.add_argument('--kill', action='store_true', help='Kill the currently running taskmaster') + + +def taskmaster_job_kwargs(delay, cli_args): + + # default args + data = { + 'name': "taskmaster", + 'account': "prismsprojectdebug_fluxoe", + 'nodes': "1", + 'ppn': "1", + 'walltime': "1:00:00", + 'pmem': "3800mb", + 'qos': "flux", + 'queue': "fluxoe", + 'message': None, + 'email': None, + 'priority': "-1000", + 'command': "rm taskmaster.o*; rm taskmaster.e*\n", + 'auto': False} + + settings = config.settings() + if 'taskmaster_job_kwargs' not in settings: + settings['taskmaster_job_kwargs'] = data + config.configure(settings) + config.write_config() + else: + for key, value in iteritems(settings['taskmaster_job_kwargs']): + data[key] = value + + data['exetime'] = prisms_jobs.misc.exetime(delay) + data['command'] += "\ntaskmaster " + ' '.join(cli_args) + + return data + + +def main(): + args = parser.parse_args() + + if args.hold: + jobid = software.job_id(name="taskmaster") + if len(jobid) != 0: + software.hold(jobid[-1]) + elif args.release: + jobid = software.job_id(name="taskmaster") + if len(jobid) != 0: + software.release(jobid[-1]) + elif args.kill: + jobid = software.job_id(name="taskmaster") + if len(jobid) != 0: + software.alter(jobid[-1], "-a " + prisms_jobs.misc.exetime("10:00:00:00") ) + software.delete(jobid[-1]) + else: + + # check if taskmaster already running (besides this one) + check_for_other() + + # continue jobs + db = prisms_jobs.JobDB() + db.update() + db.continue_all() + db.close() + + # submit taskmaster + print("submit taskmaster") + j = prisms_jobs.Job(**taskmaster_job_kwargs(args.delay, sys.argv[1:])) + j.submit(add=False) + + #print "submit string:" + #print j.sub_string() + + +if __name__ == "__main__": + main() diff --git a/prisms_jobs/templates.py b/prisms_jobs/templates.py index 8de5ed9..e7d4bf2 100644 --- a/prisms_jobs/templates.py +++ b/prisms_jobs/templates.py @@ -1,5 +1,8 @@ -import job, misc +from __future__ import (absolute_import, division, print_function, unicode_literals) +from builtins import * + import sys +from prisms_jobs import job, misc def PrismsJob( name = "STDIN", \ nodes = "1", \ @@ -19,15 +22,15 @@ def PrismsJob( name = "STDIN", \ """ if int(nodes)*int(ppn) > 1000: - print "Error in PrismsJob(). Requested more than 1000 cores." + print("Error in PrismsJob(). Requested more than 1000 cores.") sys.exit() if int(ppn) > 16: - print "Error in PrismsJob(). Requested more than 16 ppn." + print("Error in PrismsJob(). Requested more than 16 ppn.") sys.exit() if misc.hours(walltime) > 48.0: - print "Error in PrismsJob(). Requested more than 48 hrs walltime." + print("Error in PrismsJob(). Requested more than 48 hrs walltime.") sys.exit() j = job.Job( name = name, \ @@ -65,15 +68,15 @@ def NonPrismsJob( name = "STDIN", \ """ if int(nodes)*int(ppn) > 1000: - print "Error in NonPrismsJob(). Requested more than 1000 cores." + print("Error in NonPrismsJob(). Requested more than 1000 cores.") sys.exit() if int(ppn) > 16: - print "Error in NonPrismsJob(). Requested more than 16 ppn." + print("Error in NonPrismsJob(). Requested more than 16 ppn.") sys.exit() if misc.hours(walltime) > 48.0: - print "Error in NonPrismsJob(). Requested more than 48 hrs walltime." + print("Error in NonPrismsJob(). Requested more than 48 hrs walltime.") sys.exit() j = job.Job( name = name, \ @@ -111,15 +114,15 @@ def PrismsPriorityJob( name = "STDIN", \ """ if int(nodes)*int(ppn) > 1000: - print "Error in PrismsPriorityJob(). Requested more than 1000 cores." + print("Error in PrismsPriorityJob(). Requested more than 1000 cores.") sys.exit() if int(ppn) > 16: - print "Error in PrismsPriorityJob(). Requested more than 16 ppn." + print("Error in PrismsPriorityJob(). Requested more than 16 ppn.") sys.exit() if misc.hours(walltime) > 48.0: - print "Error in PrismsPriorityJob(). Requested more than 48 hrs walltime." + print("Error in PrismsPriorityJob(). Requested more than 48 hrs walltime.") sys.exit() j = job.Job( name = name, \ @@ -156,15 +159,15 @@ def PrismsDebugJob( name = "STDIN", \ """ if int(nodes)*int(ppn) > 80: - print "Error in PrismsDebugJob(). Requested more than 80 cores." + print("Error in PrismsDebugJob(). Requested more than 80 cores.") sys.exit() if int(ppn) > 16: - print "Error in PrismsDebugJob(). Requested more than 16 ppn." + print("Error in PrismsDebugJob(). Requested more than 16 ppn.") sys.exit() if misc.hours(walltime) > 6.0: - print "Error in PrismsDebugJob(). Requested more than 6 hrs walltime." + print("Error in PrismsDebugJob(). Requested more than 6 hrs walltime.") sys.exit() j = job.Job( name = name, \ @@ -205,7 +208,7 @@ def PrismsSpecialJob( name = "STDIN", \ """ if int(ppn) > 16: - print "Error in PrismsPriorityJob(). Requested more than 16 ppn." + print("Error in PrismsPriorityJob(). Requested more than 16 ppn.") sys.exit() j = job.Job( name = name, \ diff --git a/requirements.txt b/requirements.txt index 5445d77..dd68eb2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -argparse; python_version<2.7 +future +six diff --git a/scripts/taskmaster b/scripts/taskmaster deleted file mode 100755 index 9b2a19a..0000000 --- a/scripts/taskmaster +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -import argparse -import sys -import subprocess - -import prisms_jobs - - -def check_for_other(): - jobid = prisms_jobs.software.job_id(name="taskmaster") - if not len(jobid): - return - tmaster_status = prisms_jobs.software.job_status(jobid) - for j in jobid: - if j != prisms_jobs.software.job_id() and tmaster_status[j]["jobstatus"] != "C": - print "A taskmaster is already running. JobID:", j, " Status:", tmaster_status[j]["jobstatus"] - sys.exit() - - -parser = argparse.ArgumentParser(description='Automatically resubmit jobs') -parser.add_argument('-d','--delay', type=str, default="15:00", \ - help='How long to delay ("[[[DD:]HH:]MM:]SS") between executions. Default is "15:00".') - -group = parser.add_mutually_exclusive_group() -group.add_argument('--hold', action='store_true', help='Place a hold on the currently running taskmaster') -group.add_argument('--release', action='store_true', help='Release the currently running taskmaster') -group.add_argument('--kill', action='store_true', help='Kill the currently running taskmaster') - - -if __name__ == "__main__": - - args = parser.parse_args() - - if args.hold: - jobid = prisms_jobs.software.job_id(name="taskmaster") - if len(jobid) != 0: - prisms_jobs.software.hold(jobid[-1]) - elif args.release: - jobid = prisms_jobs.software.job_id(name="taskmaster") - if len(jobid) != 0: - prisms_jobs.software.release(jobid[-1]) - elif args.kill: - jobid = prisms_jobs.software.job_id(name="taskmaster") - if len(jobid) != 0: - prisms_jobs.software.alter(jobid[-1], "-a " + prisms_jobs.misc.exetime("10:00:00:00") ) - prisms_jobs.software.delete(jobid[-1]) - else: - - # check if taskmaster already running (besides this one) - check_for_other() - - # continue jobs - db = prisms_jobs.JobDB() - db.update() - db.continue_all() - db.close() - - # submit taskmaster - print "submit taskmaster" - j = prisms_jobs.templates.PrismsDebugJob(nodes="1", ppn="1", name="taskmaster", \ - exetime=prisms_jobs.misc.exetime(args.delay), auto=False, message=None, \ - command="taskmaster " + ' '.join(sys.argv[1:])) - j.submit(add=False) - - #print "submit string:" - #print j.sub_string() diff --git a/setup.py b/setup.py index 84f1178..6912ce3 100644 --- a/setup.py +++ b/setup.py @@ -1,21 +1,34 @@ -from setuptools import setup import glob +import os +from setuptools import setup, find_packages from prisms_jobs import __version__ + +# get console_scripts +def script_str(file): + name = os.path.splitext(os.path.split(file)[1])[0] + return name + '=prisms_jobs.scripts.' + name + ':main' +console_scripts = [script_str(x) for x in glob.glob('prisms_jobs/scripts/*') if x != 'prisms_jobs/scripts/__init__.py'] + setup(name='prisms_jobs', \ version=__version__, \ description='Job submission and management', \ author='PRISMS Center and CASM developers', author_email='casm-developers@lists.engr.ucsb.edu', url='https://prisms-center.github.io/prisms_jobs_docs/', \ - packages=['prisms_jobs', 'prisms_jobs.interface'], - install_requires=['argparse;python_version<"2.7"'], - scripts=glob.glob('scripts/*'), + packages=find_packages(), + entry_points={ + 'console_scripts': console_scripts + }, + python_requires='>=2.7', + install_requires=['future', 'six'], license='LGPL2.1+', classifiers=[ 'Development Status :: 4 - Beta', 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', 'Programming Language :: Python', 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', 'Topic :: Scientific/Engineering', 'Topic :: System :: Distributed Computing' - ]) + ], + data_files = [('', ['LICENSE', 'requirements.txt'])])