Skip to content

Commit

Permalink
allow graceful failure of workflows when not debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Feb 17, 2025
1 parent dcecbf3 commit c422033
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 88 deletions.
97 changes: 19 additions & 78 deletions new-docs/source/examples/glm.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"from pathlib import Path\n",
"\n",
"from pydra.design import python, workflow\n",
"from pydra.engine.submitter import Submitter\n",
"from fileformats.generic import File, Directory\n",
"from fileformats.text import Csv\n",
"import pandas as pd\n",
Expand Down Expand Up @@ -588,8 +589,10 @@
"source": [
"wf = FullWorkflow(output_dir=workflow_out_dir, n_subjects=1, contrast='StopSuccess - Go')\n",
"\n",
"if False:\n",
" results = wf(plugin='cf', n_procs=4)\n",
"if __name__ == \"__main__\":\n",
" with Submitter(worker='cf', n_procs=4) as sub:\n",
" results = sub(wf)\n",
"\n",
" print(results)"
]
},
Expand Down Expand Up @@ -630,7 +633,7 @@
},
"outputs": [],
"source": [
"!ls ../outputs/6_glm"
"! ls ../outputs/6_glm"
]
},
{
Expand All @@ -641,14 +644,6 @@
"### Plot figures"
]
},
{
"cell_type": "markdown",
"id": "dad22ca7",
"metadata": {},
"source": [
"#### First level contrast"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -662,73 +657,19 @@
"source": [
"from IPython.display import Image\n",
"\n",
"Image(filename='../outputs/6_glm/firstlevel_contrast.jpg')"
]
},
{
"cell_type": "markdown",
"id": "0cdfcc29",
"metadata": {},
"source": [
"#### Nilearn Z map"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f08aa59f",
"metadata": {
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"Image(filename='../outputs/6_glm/nilearn_z_map.jpg')"
]
},
{
"cell_type": "markdown",
"id": "ca1b896f",
"metadata": {},
"source": [
"#### FSL Z map"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7d18b6ed",
"metadata": {
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"Image(filename='../outputs/6_glm/fsl_z_map.jpg')"
]
},
{
"cell_type": "markdown",
"id": "fc68e7dc",
"metadata": {},
"source": [
"#### Nilearn FSL comparison"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a37679ff",
"metadata": {
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"Image(filename='../outputs/6_glm/nilearn_fsl_comp.jpg')"
"\n",
"if not results.errored:\n",
" # First-level contrast\n",
" Image(filename='../outputs/6_glm/firstlevel_contrast.jpg')\n",
"\n",
" # Nilearn Z map\n",
" Image(filename='../outputs/6_glm/nilearn_z_map.jpg')\n",
"\n",
" # FSL Z map\n",
" Image(filename='../outputs/6_glm/fsl_z_map.jpg')\n",
"\n",
" # Nilearn and FSL comparison\n",
" Image(filename='../outputs/6_glm/nilearn_fsl_comp.jpg')"
]
},
{
Expand Down
45 changes: 35 additions & 10 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@

if ty.TYPE_CHECKING:
from .node import Node
from .specs import TaskDef, WorkflowDef, TaskHooks
from .specs import TaskDef, TaskOutputs, WorkflowDef, TaskHooks, Result
from .environments import Environment
from .state import State

DefType = ty.TypeVar("DefType", bound="TaskDef")
OutputType = ty.TypeVar("OutputType", bound="TaskOutputs")

# Used to flag development mode of Audit
develop = False
Expand Down Expand Up @@ -167,14 +168,34 @@ def worker(self):

def __call__(
self,
task_def: "TaskDef",
name: str | None = "task",
task_def: "TaskDef[OutputType]",
hooks: "TaskHooks | None" = None,
):
"""Submitter run function."""
raise_errors: bool | None = None,
) -> "Result[OutputType]":
"""Submitter run function.
if name is None:
name = "task"
Parameters
----------
task_def : :obj:`~pydra.engine.specs.TaskDef`
The task definition to run
hooks : :obj:`~pydra.engine.specs.TaskHooks`, optional
Task hooks, callable functions called as the task is setup and torn down,
by default no functions are called at the hooks
raise_errors : bool, optional
Whether to raise errors, by default True if the 'debug' worker is used,
otherwise False
Returns
-------
result : Any
The result of the task
"""
if raise_errors is None:
raise_errors = self.worker_name == "debug"
if not isinstance(raise_errors, bool):
raise TypeError(
f"'raise_errors' must be a boolean or None, not {type(raise_errors)}"
)

task_def._check_rules()
# If the outer task is split, create an implicit workflow to hold the split nodes
Expand All @@ -198,7 +219,7 @@ def Split(defn: TaskDef, output_types: dict):
task = Task(
task_def,
submitter=self,
name=name,
name="task",
environment=self.environment,
hooks=hooks,
)
Expand All @@ -211,11 +232,15 @@ def Split(defn: TaskDef, output_types: dict):
else:
self.worker.run(task, rerun=self.rerun)
except Exception as e:
e.add_note(
msg = (
f"Full crash report for {type(task_def).__name__!r} task is here: "
+ str(task.output_dir / "_error.pklz")
)
raise e
if raise_errors:
e.add_note(msg)
raise e
else:
logger.error("\nTask execution failed\n" + msg)
finally:
self.run_start_time = None
PersistentCache().clean_up()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ doc = [
"ipython",
"ipykernel",
"ipywidgets",
"matplotlib",
"nbsphinx",
"nest_asyncio",
"nibabel",
Expand Down

0 comments on commit c422033

Please sign in to comment.