Skip to content

Commit

Permalink
fix: conform executors
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzejnovak committed Mar 14, 2022
1 parent b5ab6f1 commit 26bf2e8
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,15 @@ def __call__(
if self.x509_proxy is None:
self.x509_proxy = _get_x509_proxy()

return work_queue_main(
items,
function,
accumulator,
**self.__dict__,
), 0
return (
work_queue_main(
items,
function,
accumulator,
**self.__dict__,
),
0,
)


@dataclass
Expand Down Expand Up @@ -921,14 +924,17 @@ def belongsto(heavy_input, workerindex, item):

# FIXME: fancy widget doesn't appear, have to live with boring pbar
progress(work, multi=True, notebook=False)
return accumulate(
[
work.result()
if self.compression is None
else _decompress(work.result())
],
accumulator,
), 0
return (
accumulate(
[
work.result()
if self.compression is None
else _decompress(work.result())
],
accumulator,
),
0,
)
except KilledWorker as ex:
baditem = key_to_item[ex.task]
if self.heavy_input is not None and isinstance(baditem, tuple):
Expand Down Expand Up @@ -1581,14 +1587,14 @@ def __call__(
)
else:
processor_instance.postprocess(wrapped_out["out"])

_return = (wrapped_out["out"],)
if hasattr(self.executor, "recoverable") and self.executor.recoverable:
_return = *_return, list(wrapped_out["processed"])
if self.savemetrics and not self.use_dataframes:
wrapped_out["metrics"]["chunks"] = len(chunks)
return (
wrapped_out["out"],
list(wrapped_out["processed"]),
wrapped_out["metrics"],
)
return wrapped_out["out"], list(wrapped_out["processed"])
_return = *_return, wrapped_out["metrics"]
return _return


def run_spark_job(
Expand Down

0 comments on commit 26bf2e8

Please sign in to comment.