Skip to content

Commit

Permalink
Update to use lock file when getting a new job to prevent a job being…
Browse files Browse the repository at this point in the history
… started twice. Also refactored code for run_check_outputs and run_remove_outputs to user provides info to constructor and therefore doesn't need to provide implementations of these functions.
  • Loading branch information
petebunting committed Aug 19, 2020
1 parent 0403e40 commit cd333fa
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pbprocesstools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import json

PB_PROCESS_TOOLS_VERSION_MAJOR = 1
PB_PROCESS_TOOLS_VERSION_MINOR = 4
PB_PROCESS_TOOLS_VERSION_MINOR = 5
PB_PROCESS_TOOLS_VERSION_PATCH = 0

PB_PROCESS_TOOLS_VERSION = "{}.{}.{}".format(PB_PROCESS_TOOLS_VERSION_MAJOR,
Expand Down
194 changes: 132 additions & 62 deletions pbprocesstools/pbpt_q_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ class PBPTProcessJob(Base):
__tablename__ = "PBPTProcessJob"

PID = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
JobParams = sqlalchemy.Column(sqlalchemy.JSON, nullable=True)
JobParams = sqlalchemy.Column(sqlalchemy.PickleType, nullable=True)
Start = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
End = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
Started = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
Completed = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
Checked = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
Error = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
ErrorInfo = sqlalchemy.Column(sqlalchemy.JSON, nullable=True)
ErrorInfo = sqlalchemy.Column(sqlalchemy.PickleType, nullable=True)


@event.listens_for(Engine, "connect")
Expand Down Expand Up @@ -100,6 +100,7 @@ def __init__(self, queue_db_info=None, cmd_name=None, descript=None, params=None
:param params: optionally provide a dict which will be the options for the processing to execute
(e.g., the input and output files).
"""
self.queue_db_info = queue_db_info
self.cmd_name = cmd_name
Expand Down Expand Up @@ -354,52 +355,66 @@ def std_run(self, **kwargs):
pbpt_utils = PBPTUtils()
if self.parse_cmds(**kwargs):
if self.debug_job_id is None:
sqlite_db_file = self.queue_db_info['sqlite_db_file']
sqlite_db_dir, sqlite_db_filename = os.path.split(sqlite_db_file)
sqlite_db_conn = self.queue_db_info['sqlite_db_conn']
logger.debug("Database connection info: '{}'.".format(sqlite_db_conn))
found_job = False
# Sleep for a random period of time to minimise clashes between multiple processes so they are offset.
time.sleep(random.randint(1,10))
n_failed_lck = 0
while True:
try:
logger.debug("Creating Database Engine and Session.")
db_engine = sqlalchemy.create_engine(sqlite_db_conn, pool_pre_ping=True)
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
ses = session_sqlalc()
logger.debug("Created Database Engine and Session.")

logger.debug("Find the next scene to process.")
job_info = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Completed == False,
PBPTProcessJob.Started == False).order_by(
PBPTProcessJob.PID.asc()).first()
if job_info is not None:
found_job = True
self.job_pid = job_info.PID
self.params = job_info.JobParams
job_info.Started = True
job_info.Start = datetime.datetime.now()
ses.commit()
logger.debug("Found the next scene to process. PID: {}".format(self.job_pid))
else:
found_job = False
logger.debug("No job found to process - finishing.")
ses.close()
logger.debug("Closed Database Engine and Session.")
except Exception as e:
logger.debug("Failed to create the database connection: '{}'".format(sqlite_db_conn))
logger.exception(e)
found_job = False
if found_job:
self.check_required_fields(**kwargs)
if pbpt_utils.get_file_lock(sqlite_db_file, sleep_period=1, wait_iters=180,
use_except=False, timeout=300):
n_failed_lck = 0
try:
self.do_processing(**kwargs)
self.completed_processing(**kwargs)
logger.debug("Creating Database Engine and Session.")
db_engine = sqlalchemy.create_engine(sqlite_db_conn, pool_pre_ping=True)
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
ses = session_sqlalc()
logger.debug("Created Database Engine and Session.")

logger.debug("Find the next scene to process.")
job_info = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Completed == False,
PBPTProcessJob.Started == False).order_by(
PBPTProcessJob.PID.asc()).first()
if job_info is not None:
found_job = True
self.job_pid = job_info.PID
self.params = job_info.JobParams
job_info.Started = True
job_info.Start = datetime.datetime.now()
ses.commit()
logger.debug("Found the next scene to process. PID: {}".format(self.job_pid))
else:
found_job = False
logger.debug("No job found to process - finishing.")
ses.close()
logger.debug("Closed Database Engine and Session.")
except Exception as e:
import traceback
err_dict = dict()
err_dict['error'] = str(e)
err_dict['traceback'] = traceback.format_exc()
self.record_process_error(err_dict)
logger.debug("Failed to create the database connection: '{}'".format(sqlite_db_conn))
logger.exception(e)
found_job = False
pbpt_utils.release_file_lock(sqlite_db_file, timeout=300)
if found_job:
self.check_required_fields(**kwargs)
try:
self.do_processing(**kwargs)
self.completed_processing(**kwargs)
except Exception as e:
import traceback
err_dict = dict()
err_dict['error'] = str(e)
err_dict['traceback'] = traceback.format_exc()
self.record_process_error(err_dict)
else:
break
else:
n_failed_lck = n_failed_lck + 1
pbpt_utils.clean_file_locks(sqlite_db_dir, timeout=300)

if n_failed_lck > 10:
pbpt_utils.clean_file_locks(sqlite_db_dir, timeout=300)
break
else:
sqlite_db_conn = self.queue_db_info['sqlite_db_conn']
Expand Down Expand Up @@ -439,18 +454,28 @@ def std_run(self, **kwargs):

class PBPTGenQProcessToolCmds(PBPTProcessToolsBase):

def __init__(self, cmd, sqlite_db_file, uid_len=6):
def __init__(self, cmd, sqlite_db_file, uid_len=6, process_tools_path=None,
process_tools_mod=None, process_tools_cls=None):
"""
A class to implement a the generation of commands for batch processing data analysis.
:param cmd: the command to be executed (e.g., python run_analysis.py).
:param queue_db_info: The database dict info. Require fields: sqlite_db_conn, sqlite_db_file
x:param process_tools_path: The path (if not already in path; i.e., same directory) to find the
PBPTQProcessTool implementation used within this class
:param process_tools_mod: The module name (i.e., python file name 'xxxx.py' containing the PBPTQProcessTool
implementation.
:param process_tools_cls: The name of the class implementing PBPTQProcessTool.
"""
self.params = []
self.cmd = cmd
self.sqlite_db_file = os.path.abspath(sqlite_db_file)
self.sqlite_db_conn = "sqlite:///{}".format(self.sqlite_db_file)
self.process_tools_path = process_tools_path
self.process_tools_mod = process_tools_mod
self.process_tools_cls = process_tools_cls

super().__init__(uid_len)

@abstractmethod
Expand All @@ -470,19 +495,23 @@ def gen_command_info(self, **kwargs):
"""
pass

def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_file, out_err_info_file, **kwargs):
def check_job_outputs(self, out_err_pid_file, out_err_info_file, process_tools_path=None,
process_tools_mod=None, process_tools_cls=None, **kwargs):
"""
A function which following the completion of all the processing for a job tests whether all the output
files where created (i.e., the job successfully completed).
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
of the PBPTProcessTool class used for the processing to be checked.
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
for the processing to be checked.
:param out_err_pid_file: the output file name and path for the list of database PIDs which have not
been successfully processed.
:param out_err_info_file: the output file name and path for the output error report from this function
where processing might not have fully completed.
:param process_tools_mod: the path containing the implementation of the PBPTProcessTool class
used for the processing to be checked. If None then class value passed to
constructor will be used.
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
of the PBPTProcessTool class used for the processing to be checked.
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
for the processing to be checked.
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
these will be passed to the process_tools_mod outputs_present function.
Expand All @@ -493,6 +522,22 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn

if process_tools_path is None:
process_tools_path = self.process_tools_path

if process_tools_mod is None:
if self.process_tools_mod is None:
raise Exception("A PBPTProcessTool implementation module has not been provided to the constructor.")
process_tools_mod = self.process_tools_mod

if process_tools_cls is None:
if self.process_tools_cls is None:
raise Exception("A PBPTProcessTool implementation class has not been provided to the constructor.")
process_tools_cls = self.process_tools_cls

if process_tools_path is not None:
sys.path.insert(0, process_tools_path)

process_tools_mod_inst = importlib.import_module(process_tools_mod)
if process_tools_mod_inst is None:
raise Exception("Could not load the module: '{}'".format(process_tools_mod))
Expand Down Expand Up @@ -544,19 +589,22 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
pathlib.Path(out_err_pid_file).touch()
pathlib.Path(out_err_info_file).touch()

def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=False, error_jobs=False, **kwargs):
def remove_job_outputs(self, all_jobs=False, error_jobs=False, process_tools_path=None, process_tools_mod=None,
process_tools_cls=None, **kwargs):
"""
A function which following the completion of all the processing for a job tests whether all the output
files where created (i.e., the job successfully completed).
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
logged an error or started but not finished.
:param process_tools_mod: the path containing the implementation of the PBPTProcessTool class
used for the processing to be checked. If None then class value passed to
constructor will be used.
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
of the PBPTProcessTool class used for the processing to be checked.
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
for the processing to be checked.
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
logged an error or started but not finished.
:param job_id: int for the job id for which the outputs will be removed.
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
these will be passed to the process_tools_mod outputs_present function.
Expand All @@ -570,6 +618,22 @@ def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=Fals
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn

if process_tools_path is None:
process_tools_path = self.process_tools_path

if process_tools_mod is None:
if self.process_tools_mod is None:
raise Exception("A PBPTProcessTool implementation module has not been provided to the constructor.")
process_tools_mod = self.process_tools_mod

if process_tools_cls is None:
if self.process_tools_cls is None:
raise Exception("A PBPTProcessTool implementation class has not been provided to the constructor.")
process_tools_cls = self.process_tools_cls

if process_tools_path is not None:
sys.path.insert(0, process_tools_path)

process_tools_mod_inst = importlib.import_module(process_tools_mod)
if process_tools_mod_inst is None:
raise Exception("Could not load the module: '{}'".format(process_tools_mod))
Expand Down Expand Up @@ -844,31 +908,37 @@ def run_gen_commands(self):
"""
pass

@abstractmethod
def run_check_outputs(self):
"""
An abstract function which needs to be implemented with the functions and inputs
you want run to check the outputs of the processing have been successfully completed.
A function which runs to check the outputs of the processing have been successfully completed.
This function is executed when the user provides the --check option on the terminal.
This function will by default output two files:
You will presumably want to call:
* processing_errs_scns_yyyymmdd.txt
* non_complete_errs_yyyymmdd.txt
* self.check_job_outputs
To change the output file names you will probably want to create your own version of this function
calling the self.check_job_outputs function.
"""
pass
time_sample_str = self.generate_readable_timestamp_str()
out_err_pid_file = 'processing_errs_scns_{}.txt'.format(time_sample_str)
out_err_info_file = 'non_complete_errs_{}.txt'.format(time_sample_str)
self.check_job_outputs(out_err_pid_file, out_err_info_file)

@abstractmethod
def run_remove_outputs(self, all_jobs=False, error_jobs=False):
"""
An abstract function which needs to be implemented with the functions and inputs
you want run to check the outputs of the processing have been successfully completed.
A function which removes the system output files, resetting the jobs to be rerun.
This function is executed when the user provides the --rmouts option (with either --all or --error).
You will presumably want to call:
If you want some different functionality then you may want to create your own version of this function.
* self.remove_job_outputs
:param all_jobs: remove the outputs for all jobs regardless of whether they have successfully completed or not.
:param error_jobs: only remove the outputs (which may or may not be present) from jobs which have resulted in
an error.
"""
pass
self.remove_job_outputs(all_jobs, error_jobs)

def parse_cmds(self, argv=None):
"""
Expand Down
6 changes: 5 additions & 1 deletion pbprocesstools/pbpt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

class PBPTUtils(object):

def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=False):
def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=False, timeout=3600):
"""
A function which gets a lock on a file.
Expand All @@ -60,6 +60,7 @@ def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=F
available. If False (default) False will be returned if the lock is
not successful.
:return: boolean. True: lock was successfully gained. False: lock was not gained.
:param timeout: the time (in seconds) for the lock file timeout. Default: 3600 (1 hours).
"""
file_path, file_name = os.path.split(input_file)
Expand All @@ -80,7 +81,10 @@ def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=F
f.flush()
f.close()
elif use_except:
self.clean_file_locks(file_path, timeout)
raise Exception("Lock could not be gained for file: {}".format(input_file))
else:
self.clean_file_locks(file_path, timeout)

return got_lock

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import os

setuptools.setup(name='pb_process_tools',
version='1.4.0',
version='1.5.0',
description='Tools for batch processing data, including on HPC cluster with slurm.',
author='Pete Bunting',
author_email='petebunting@mac.com',
Expand Down

0 comments on commit cd333fa

Please sign in to comment.