diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3a82d99c3c..08878d2708 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,11 +16,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased]
-## Added
+### Added
- `Robustfov` feature in `FSL-BET` to crop images ensuring removal of neck regions that may appear in the skull-stripped images.
+- Ability to throttle nodes, estimating all available memory when threading
-## Changed
+### Changed
- Moved autoversioning from CI to pre-commit
- Updated `FSL-BET` config to default `-mask-boolean` flag as on, and removed all removed `mask-boolean` keys from configs.
@@ -28,7 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [1.8.6] - 2024-01-15
-## Added
+### Added
- Some automatic handling of user-provided BIDSy atlas names.
- `sig_imports` static method decorator for `Function` nodes, to accommodate type hinting in signatures of `Function` node functions.
diff --git a/CPAC/generate_motion_statistics/generate_motion_statistics.py b/CPAC/generate_motion_statistics/generate_motion_statistics.py
index f8400804ef..8d918dd07d 100644
--- a/CPAC/generate_motion_statistics/generate_motion_statistics.py
+++ b/CPAC/generate_motion_statistics/generate_motion_statistics.py
@@ -207,7 +207,8 @@ def motion_power_statistics(name='motion_stats',
name='cal_DVARS',
mem_gb=0.4,
mem_x=(739971956005215 / 151115727451828646838272,
- 'in_file'))
+ 'in_file'),
+ throttle=True)
cal_DVARS_strip = pe.Node(Function(input_names=['file_1D'],
output_names=['out_file', 'DVARS_val'],
diff --git a/CPAC/nuisance/nuisance.py b/CPAC/nuisance/nuisance.py
index 97b5186f1a..c19fd26080 100644
--- a/CPAC/nuisance/nuisance.py
+++ b/CPAC/nuisance/nuisance.py
@@ -1281,8 +1281,9 @@ def create_regressor_workflow(nuisance_selectors,
output_names=['cosfiltered_img'],
function=cosine_filter,
imports=cosfilter_imports),
- name='{}_cosine_filter'.format(regressor_type),
- mem_gb=8.0)
+ name=f'{regressor_type}_cosine_filter',
+ mem_gb=8.0,
+ throttle=True)
nuisance_wf.connect(
summary_filter_input[0], summary_filter_input[1],
cosfilter_node, 'input_image_path'
diff --git a/CPAC/pipeline/nipype_pipeline_engine/engine.py b/CPAC/pipeline/nipype_pipeline_engine/engine.py
index 899f69bbcf..31092e5269 100644
--- a/CPAC/pipeline/nipype_pipeline_engine/engine.py
+++ b/CPAC/pipeline/nipype_pipeline_engine/engine.py
@@ -51,6 +51,7 @@
import re
from copy import deepcopy
from inspect import Parameter, Signature, signature
+from typing import ClassVar, Optional, Union
from nibabel import load
from nipype.interfaces.utility import Function
from nipype.pipeline import engine as pe
@@ -69,6 +70,7 @@
from traits.trait_base import Undefined
from traits.trait_handlers import TraitListObject
from CPAC.utils.monitoring.custom_logging import getLogger
+from CPAC.utils.typing import DICT
# set global default mem_gb
DEFAULT_MEM_GB = 2.0
@@ -153,7 +155,13 @@ class Node(pe.Node):
{" >>> realign.inputs.in_files = 'functional.nii'"}
)
- def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
+ def __init__(
+ self,
+ *args,
+ mem_gb: Optional[float] = DEFAULT_MEM_GB,
+ throttle: Optional[bool] = False,
+ **kwargs
+ ) -> None:
# pylint: disable=import-outside-toplevel
from CPAC.pipeline.random_state import random_seed
super().__init__(*args, mem_gb=mem_gb, **kwargs)
@@ -162,6 +170,8 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
self.seed_applied = False
self.input_data_shape = Undefined
self._debug = False
+ if throttle:
+ self.throttle = True
self.verbose_logger = None
self._mem_x = {}
if 'mem_x' in kwargs and isinstance(
@@ -195,7 +205,8 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
Parameter('mem_gb', Parameter.POSITIONAL_OR_KEYWORD,
default=DEFAULT_MEM_GB)
)[1] for p in orig_sig_params[:-1]] + [
- Parameter('mem_x', Parameter.KEYWORD_ONLY),
+ Parameter('mem_x', Parameter.KEYWORD_ONLY, default=None),
+ Parameter("throttle", Parameter.KEYWORD_ONLY, default=False),
orig_sig_params[-1][1]
])
@@ -232,7 +243,11 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
``mode`` can be any one of
* 'xyzt' (spatial * temporal) (default if not specified)
* 'xyz' (spatial)
- * 't' (temporal)'''])) # noqa: E501 # pylint: disable=line-too-long
+ * 't' (temporal)
+
+ throttle : bool, optional
+ Assume this Node will use all available memory if no observation run is
+ provided.'''])) # noqa: E501 # pylint: disable=line-too-long
def _add_flags(self, flags):
r'''
@@ -440,12 +455,21 @@ def __init__(self, *args, **kwargs):
if not self.name.endswith('_'):
self.name = f'{self.name}_'
- __init__.__signature__ = Signature(parameters=[
- p[1] if p[0] != 'mem_gb' else (
- 'mem_gb',
- Parameter('mem_gb', Parameter.POSITIONAL_OR_KEYWORD,
- default=DEFAULT_MEM_GB)
- )[1] for p in signature(pe.Node).parameters.items()])
+ _parameters: ClassVar[DICT[str, Parameter]] = {}
+ _custom_params: ClassVar[DICT[str, Union[bool, float]]] = {
+ "mem_gb": DEFAULT_MEM_GB,
+ "throttle": False,
+ }
+ for param, default in _custom_params.items():
+ for p in signature(pe.Node).parameters.items():
+ if p[0] in _custom_params:
+ _parameters[p[0]] = Parameter(
+ param, Parameter.POSITIONAL_OR_KEYWORD, default=default
+ )
+ else:
+ _parameters[p[0]] = p[1]
+ __init__.__signature__ = Signature(parameters=list(_parameters.values()))
+ del _custom_params, _parameters
class Workflow(pe.Workflow):
diff --git a/CPAC/pipeline/nipype_pipeline_engine/plugins/cpac_nipype_custom.py b/CPAC/pipeline/nipype_pipeline_engine/plugins/cpac_nipype_custom.py
index a843931783..e96f2d0e3a 100644
--- a/CPAC/pipeline/nipype_pipeline_engine/plugins/cpac_nipype_custom.py
+++ b/CPAC/pipeline/nipype_pipeline_engine/plugins/cpac_nipype_custom.py
@@ -47,6 +47,9 @@
from CPAC.utils.monitoring import log_nodes_cb
+OVERHEAD_MEMORY_ESTIMATE: float = 1 # estimate of C-PAC + Nipype overhead (GB)
+
+
def get_peak_usage():
"""Function to return peak usage in GB.
@@ -189,17 +192,19 @@ def _prerun_check(self, graph):
tasks_num_th = []
overrun_message_mem = None
overrun_message_th = None
- # estimate of C-PAC + Nipype overhead (GB):
- overhead_memory_estimate = 1
for node in graph.nodes():
if hasattr(self, 'runtime'):
self._override_memory_estimate(node)
+ elif hasattr(node, "throttle"):
+ # for a throttled node without an observation run,
+ # assume all available memory will be needed
+ node._mem_gb = self.memory_gb - OVERHEAD_MEMORY_ESTIMATE
try:
node_memory_estimate = node.mem_gb
except FileNotFoundError:
# pylint: disable=protected-access
node_memory_estimate = node._apply_mem_x(UNDEFINED_SIZE)
- node_memory_estimate += overhead_memory_estimate
+ node_memory_estimate += OVERHEAD_MEMORY_ESTIMATE
if node_memory_estimate > self.memory_gb:
tasks_mem_gb.append((node.name, node_memory_estimate))
if node.n_procs > self.processors:
diff --git a/CPAC/pipeline/test/test_nipype_pipeline_engine.py b/CPAC/pipeline/test/test_nipype_pipeline_engine.py
index f722a839cb..ef0efd282f 100644
--- a/CPAC/pipeline/test/test_nipype_pipeline_engine.py
+++ b/CPAC/pipeline/test/test_nipype_pipeline_engine.py
@@ -1,11 +1,35 @@
+# Copyright (C) 2021-2024 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Tests for C-PAC customizations to nipype.pipeline.engine."""
import os
import pytest
from nibabel.testing import data_path
-from nipype import Function
from nipype.interfaces.utility import IdentityInterface
+from nipype.utils.profiler import get_system_total_memory_gb
from traits.trait_base import Undefined
+
+from CPAC.nuisance.utils.compcor import cosine_filter
from CPAC.pipeline.nipype_pipeline_engine import (
DEFAULT_MEM_GB, get_data_size, Node, MapNode, Workflow)
+from CPAC.pipeline.nipype_pipeline_engine.plugins import (
+ LegacyMultiProcPlugin,
+ MultiProcPlugin,
+)
+from CPAC.utils.interfaces.function import Function
def get_sample_data(filepath):
@@ -35,6 +59,32 @@ def test_Node(): # pylint: disable=invalid-name
assert res.outputs.f_x == 4
+@pytest.mark.parametrize("plugin", [LegacyMultiProcPlugin, MultiProcPlugin])
+@pytest.mark.parametrize("throttle", [True, False])
+def test_throttle(plugin: type, throttle: bool) -> None:
+ """Test Node throttling."""
+ small_estimate = 0.0003
+ cosfilter_node = Node(
+ Function(
+ input_names=["input_image_path", "timestep"],
+ output_names=["cosfiltered_img"],
+ function=cosine_filter,
+ as_module=True,
+ ),
+ name="aCompCor_cosine_filter",
+ mem_gb=small_estimate,
+ throttle=throttle,
+ )
+ wf = Workflow(name="aCompCor_cosine_filter_wf")
+ wf.add_nodes([cosfilter_node])
+ plugin(plugin_args={"raise_insufficient": False})._prerun_check(wf._graph)
+ if throttle:
+ assert cosfilter_node.mem_gb > small_estimate
+ assert cosfilter_node.mem_gb <= get_system_total_memory_gb()
+ else:
+ assert cosfilter_node.mem_gb == small_estimate
+
+
def test_Workflow(tmpdir): # pylint: disable=invalid-name
example_filepath = os.path.join(data_path, 'example4d.nii.gz')
pass_in_filepath = IdentityInterface(fields=['x'])
diff --git a/CPAC/qc/xcp.py b/CPAC/qc/xcp.py
index 3104256c02..0f6494a69f 100644
--- a/CPAC/qc/xcp.py
+++ b/CPAC/qc/xcp.py
@@ -112,7 +112,8 @@ def _connect_motion(wf, nodes, strat_pool, qc_file, pipe_num):
name=f'cal_DVARS_{pipe_num}',
mem_gb=0.4,
mem_x=(739971956005215 / 151115727451828646838272,
- 'in_file'))
+ 'in_file'),
+ throttle=True)
cal_DVARS_strip = pe.Node(Function(input_names=['file_1D'],
output_names=['out_file'],
function=DVARS_strip_t0,
diff --git a/CPAC/registration/registration.py b/CPAC/registration/registration.py
index cd3e69444a..8658aae219 100644
--- a/CPAC/registration/registration.py
+++ b/CPAC/registration/registration.py
@@ -1209,7 +1209,8 @@ def create_wf_calculate_ants_warp(
imports=reg_imports),
name='calc_ants_warp',
mem_gb=2.8,
- mem_x=(2e-7, 'moving_brain', 'xyz'))
+ mem_x=(2e-7, 'moving_brain', 'xyz'),
+ throttle=True)
calculate_ants_warp.interface.num_threads = num_threads
diff --git a/CPAC/utils/typing.py b/CPAC/utils/typing.py
index a838a7c76b..cfb6dd0612 100644
--- a/CPAC/utils/typing.py
+++ b/CPAC/utils/typing.py
@@ -15,39 +15,48 @@
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see .
"""
-Helpers and aliases for handling typing in main and variant Python versions
+Helpers and aliases for handling typing in main and variant Python versions.
Once all variants (see {DOCS_URL_PREFIX}/user/versions#variants)
run Python ≥ 3.10, these global variables can be replaced with the
current preferred syntax.
"""
+from pathlib import Path
import sys
from typing import Union
+
from CPAC.utils.docs import DOCS_URL_PREFIX
# Set the version-specific documentation URL in the module docstring:
-__doc__ = __doc__.replace(r'{DOCS_URL_PREFIX}', DOCS_URL_PREFIX)
+__doc__ = __doc__.replace(r"{DOCS_URL_PREFIX}", DOCS_URL_PREFIX) # noqa: A001
if sys.version_info >= (3, 8):
- from typing import Literal
- LITERAL = Literal
+ from typing import Literal as LITERAL
else:
- from typing_extensions import Literal
- LITERAL = Literal
+ from typing_extensions import Literal as LITERAL
if sys.version_info >= (3, 9):
- from collections.abc import Iterable
- LIST = list
+ from builtins import dict as DICT, list as LIST
+ from collections.abc import Iterable as ITERABLE
else:
- from typing import Iterable, List
- LIST = List
+ from typing import Dict as DICT, Iterable as ITERABLE, List as LIST
if sys.version_info >= (3, 10):
+ from builtins import tuple as TUPLE
+
LIST_OR_STR = LIST[str] | str # pylint: disable=invalid-name
- TUPLE = tuple
else:
- from typing import Tuple
+ from typing import Tuple as TUPLE
+
LIST_OR_STR = Union[LIST[str], str] # pylint: disable=invalid-name
- TUPLE = Tuple
-ITERABLE = Iterable
+
+PATHSTR = Union[Path, str]
ConfigKeyType = Union[str, LIST[str]]
-__all__ = ['ConfigKeyType', 'ITERABLE', 'LIST', 'LIST_OR_STR', 'LITERAL',
- 'TUPLE']
+__all__ = [
+ "ConfigKeyType",
+ "DICT",
+ "ITERABLE",
+ "LIST",
+ "LIST_OR_STR",
+ "LITERAL",
+ "PATHSTR",
+ "TUPLE",
+]