From 25c3ca5d268b5091782051753fd868523fd60202 Mon Sep 17 00:00:00 2001 From: Andrzej Date: Mon, 14 Mar 2022 21:03:20 +0100 Subject: [PATCH] fix: conform executors --- coffea/processor/executor.py | 46 ++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/coffea/processor/executor.py b/coffea/processor/executor.py index 44f7efb067..be9d88ae52 100644 --- a/coffea/processor/executor.py +++ b/coffea/processor/executor.py @@ -497,12 +497,15 @@ def __call__( if self.x509_proxy is None: self.x509_proxy = _get_x509_proxy() - return work_queue_main( - items, - function, - accumulator, - **self.__dict__, - ), 0 + return ( + work_queue_main( + items, + function, + accumulator, + **self.__dict__, + ), + 0, + ) @dataclass @@ -921,14 +924,17 @@ def belongsto(heavy_input, workerindex, item): # FIXME: fancy widget doesn't appear, have to live with boring pbar progress(work, multi=True, notebook=False) - return accumulate( - [ - work.result() - if self.compression is None - else _decompress(work.result()) - ], - accumulator, - ), 0 + return ( + accumulate( + [ + work.result() + if self.compression is None + else _decompress(work.result()) + ], + accumulator, + ), + 0, + ) except KilledWorker as ex: baditem = key_to_item[ex.task] if self.heavy_input is not None and isinstance(baditem, tuple): @@ -1581,14 +1587,14 @@ def __call__( ) else: processor_instance.postprocess(wrapped_out["out"]) + + _return = (wrapped_out["out"],) + if hasattr(self.executor, "recoverable") and self.executor.recoverable: + _return = *_return, list(wrapped_out["processed"]) if self.savemetrics and not self.use_dataframes: wrapped_out["metrics"]["chunks"] = len(chunks) - return ( - wrapped_out["out"], - list(wrapped_out["processed"]), - wrapped_out["metrics"], - ) - return wrapped_out["out"], list(wrapped_out["processed"]) + _return = *_return, wrapped_out["metrics"] + return _return if len(_return) > 1 else _return[0] def run_spark_job(