Skip to content

Commit

Permalink
Add support for group jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Mar 10, 2023
1 parent 8f2d086 commit 9cd02d9
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 87 deletions.
247 changes: 174 additions & 73 deletions law/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from law.config import Config
from law.target.file import get_scheme
from law.util import colored, make_list, iter_chunks, flatten, makedirs, create_hash, empty_context
from law.util import colored, make_list, iter_chunks, makedirs, create_hash, empty_context
from law.logger import get_logger


Expand Down Expand Up @@ -101,6 +101,13 @@ class BaseJobManager(six.with_metaclass(ABCMeta, object)):
A dictionary that defines to coloring styles per job status that is used in
:py:meth:`status_line`.
.. py:classattribute:: job_grouping
type: bool
Whether this manager implementation groups jobs into single interactions for submission and
status queries. In general, this means that the submission of a single job file can result in
multiple jobs on the remote batch system.
.. py:classattribute:: chunk_size_submit
type: int
Expand Down Expand Up @@ -143,6 +150,9 @@ class BaseJobManager(six.with_metaclass(ABCMeta, object)):
FAILED: ({}, {}, {"color": "red", "style": "bright"}),
}

# job grouping settings
job_grouping = False

# chunking settings for unbatched methods
# disabled by default
chunk_size_submit = 0
Expand Down Expand Up @@ -177,45 +187,49 @@ def __init__(self, status_names=None, status_diff_styles=None, threads=1):
@abstractmethod
def submit(self):
"""
Abstract atomic job submission.
Abstract atomic or group job submission.
Can throw exceptions.
Should return a list of job ids.
"""
return

@abstractmethod
def cancel(self):
"""
Abstract atomic job cancellation.
Abstract atomic or group job cancellation.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return

@abstractmethod
def cleanup(self):
"""
Abstract atomic job cleanup.
Abstract atomic or group job cleanup.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return

@abstractmethod
def query(self):
"""
Abstract atomic job status query.
Abstract atomic or group job status query.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return

def submit_batch(self, job_files, threads=None, chunk_size=None, callback=None, **kwargs):
def group_job_ids(self, job_ids):
"""
Submits a batch of jobs given by *job_files* via a thread pool of size *threads* which
defaults to its instance attribute. When *chunk_size*, which defaults to
:py:attr:`chunk_size_submit`, is not negative, *job_files* are split into chunks of that
size which are passed to :py:meth:`submit`. When *callback* is set, it is invoked after each
successful job submission with the index of the corresponding job file (starting at 0) and
either the assigned job id or an exception if any occurred. All other *kwargs* are passed to
:py:meth:`submit`.
The return value is a list containing the return values of the particular :py:meth:`submit`
calls, in an order that corresponds to *job_files*. When an exception was raised during a
submission, this exception is added to the returned list.
Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when
:py:attr:`job_grouping` is *True*. If so, it should take a sequence of *job_ids and return a
list of groups (again lists) of ids, with an arbitrary grouping mechanism.
"""
raise NotImplementedError(
"internal error, {}.group_job_ids not implemented".format(self.__class__.__name__),
)

def _apply_batch(
self,
func,
Expand Down Expand Up @@ -365,35 +379,6 @@ def cleanup_batch(self, job_ids, threads=None, chunk_size=None, callback=None, *
callback=callback,
**kwargs,
)
# build chunks (either job ids one by one, or real chunks of job ids)
job_ids = make_list(job_ids)
chunks = list(iter_chunks(job_ids, chunk_size)) if chunking else job_ids

# factory to call the passed callback for each job id even when chunking
def cb_factory(i):
if not callable(callback):
return None

if chunking:
def wrapper(err):
offset = sum(len(chunk) for chunk in chunks[:i])
for j in range(len(chunks[i])):
callback(offset + j, err)
else:
def wrapper(err):
callback(i, err)

return wrapper

# threaded processing
pool = ThreadPool(threads)
results = [pool.apply_async(self.cleanup, (v,), kwargs, callback=cb_factory(i))
for i, v in enumerate(chunks)]
pool.close()
pool.join()

# store errors
errors = list(filter(bool, flatten(get_async_result_silent(res) for res in results)))

# return only errors
return [error for error in results.values() if isinstance(error, Exception)]
Expand Down Expand Up @@ -422,30 +407,37 @@ def query_batch(self, job_ids, threads=None, chunk_size=None, callback=None, **k
callback=callback,
**kwargs,
)

def _apply_group(
self,
func,
result_type,
group_ids,
job_objs,
threads=None,
callback=None,
**kwargs,
):
# default arguments
threads = max(threads or self.threads or 1, 1)

# is chunking allowed?
if self.chunk_size_query:
chunk_size = max(chunk_size or self.chunk_size_query, 0)
else:
chunk_size = 0
chunking = chunk_size > 0

# build chunks (either job ids one by one, or real chunks of job ids)
job_ids = make_list(job_ids)
chunks = list(iter_chunks(job_ids, chunk_size)) if chunking else job_ids
# group objects
job_objs = make_list(job_objs)
if group_ids:
job_objs = self.group_job_ids(job_objs)
if isinstance(job_objs, dict):
job_objs = list(job_objs.values())

# factory to call the passed callback for each job file even when chunking
def cb_factory(i):
if not callable(callback):
return None

if chunking:
def wrapper(query_data):
offset = sum(len(chunk) for chunk in chunks[:i])
for j in range(len(chunks[i])):
data = query_data if isinstance(query_data, Exception) else query_data[j]
if group_ids:
def wrapper(result_data):
offset = sum(map(len, job_objs[:i]))
for j in range(len(job_objs[i])):
data = result_data if isinstance(result_data, Exception) else result_data[j]
callback(offset + j, data)
else:
def wrapper(data):
Expand All @@ -455,24 +447,133 @@ def wrapper(data):

# threaded processing
pool = ThreadPool(threads)
results = [pool.apply_async(self.query, (v,), kwargs, callback=cb_factory(i))
for i, v in enumerate(chunks)]
results = [
pool.apply_async(func, (arg,), kwargs, callback=cb_factory(i))
for i, arg in enumerate(job_objs)
]
pool.close()
pool.join()

# store status data per job id or an exception
query_data = {}
if chunking:
for i, (chunk, res) in enumerate(six.moves.zip(chunks, results)):
# store result data or an exception
result_data = result_type()
if group_ids:
for _job_objs, res in six.moves.zip(job_objs, results):
data = get_async_result_silent(res)
if isinstance(data, Exception):
data = {job_id: data for job_id in chunk}
query_data.update(data)
for job_obj in _job_objs:
_data = data if isinstance(data, Exception) else data[job_obj]
if isinstance(result_data, list):
result_data.append(_data)
else:
result_data[job_obj] = _data
else:
for job_id, res in six.moves.zip(job_ids, results):
query_data[job_id] = get_async_result_silent(res)
for job_obj, res in six.moves.zip(job_objs, results):
data = get_async_result_silent(res)
if isinstance(result_data, list):
result_data.append(data)
else:
result_data[job_obj] = data

return result_data

def submit_group(self, job_files, threads=None, callback=None, **kwargs):
"""
Submits several job groups given by *job_files* via a thread pool of size *threads* which
defaults to its instance attribute. As per the definition of a job group, a single job file
can result in multiple jobs being processed on the remote batch system.
When *callback* is set, it is invoked after each successful job submission with the index of
the corresponding job (starting at 0) and either the assigned job id or an exception if any
occurred. All other *kwargs* are passed to :py:meth:`submit`.
The return value is a list containing the return values of the particular :py:meth:`submit`
calls, in an order that in general corresponds *job_files*, with ids of single jobs per job
file properly expanded. When an exception was raised during a submission, this exception is
added to the returned list.
"""
return self._apply_group(
func=self.submit,
result_type=list,
group_ids=False,
job_objs=job_files,
threads=threads,
callback=callback,
**kwargs,
)

def cancel_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and cancels all
groups simultaneously via a thread pool of size *threads* which defaults to its instance
attribute.
When *callback* is set, it is invoked after each successful job cancellation with the index
of the corresponding job id (starting at 0) and either *None* or an exception if any
occurred. All other *kwargs* are passed to :py:meth:`cancel`.
Exceptions that occured during job cancelling are stored in a list and returned. An empty
list means that no exceptions occured.
"""
results = self._apply_group(
func=self.cancel,
result_type=dict,
group_ids=True,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs,
)

return query_data
# return only errors
return [error for error in results.values() if isinstance(error, Exception)]

def cleanup_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and cleans up
all groups simultaneously via a thread pool of size *threads* which defaults to its instance
attribute.
When *callback* is set, it is invoked after each successful job cleanup with the index of
the corresponding job id (starting at 0) and either *None* or an exception if any occurred.
All other *kwargs* are passed to :py:meth:`cleanup`.
Exceptions that occured during job cancelling are stored in a list and returned. An empty
list means that no exceptions occured.
"""
results = self._apply_group(
func=self.cleanup,
result_type=dict,
group_ids=True,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs,
)

# return only errors
return [error for error in results.values() if isinstance(error, Exception)]

def query_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and queries the
status of all groups simultaneously via a thread pool of size *threads* which defaults to
its instance attribute.
When *callback* is set, it is invoked after each successful job status query with the index
of the corresponding job id (starting at 0) and the obtained status query data or an
exception if any occurred. All other *kwargs* are passed to :py:meth:`query`.
This method returns a dictionary that maps job ids to either the status query data or to an
exception if any occurred.
"""
return self._apply_group(
func=self.query,
result_type=dict,
group_ids=True,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs,
)

def status_line(self, counts, last_counts=None, sum_counts=None, timestamp=True, align=False,
color=False):
Expand Down
Loading

0 comments on commit 9cd02d9

Please sign in to comment.