Skip to content

Commit

Permalink
Merge pull request #2073 from FCP-INDI/throttle
Browse files Browse the repository at this point in the history
⚡️ Throttle `aCompCor_cosine_filter`
  • Loading branch information
sgiavasis authored Mar 13, 2024
2 parents 1fe21e1 + 41bba7d commit 39c2c70
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 37 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [unreleased]

## Added
### Added

- `Robustfov` feature in `FSL-BET` to crop images ensuring removal of neck regions that may appear in the skull-stripped images.
- Ability to throttle nodes, estimating all available memory when threading

## Changed
### Changed

- Moved autoversioning from CI to pre-commit
- Updated `FSL-BET` config to default `-mask-boolean` flag as on, and removed all removed `mask-boolean` keys from configs.
- Added `dvars` as optional output in `cpac_outputs`

## [1.8.6] - 2024-01-15

## Added
### Added

- Some automatic handling of user-provided BIDSy atlas names.
- `sig_imports` static method decorator for `Function` nodes, to accommodate type hinting in signatures of `Function` node functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def motion_power_statistics(name='motion_stats',
name='cal_DVARS',
mem_gb=0.4,
mem_x=(739971956005215 / 151115727451828646838272,
'in_file'))
'in_file'),
throttle=True)

cal_DVARS_strip = pe.Node(Function(input_names=['file_1D'],
output_names=['out_file', 'DVARS_val'],
Expand Down
5 changes: 3 additions & 2 deletions CPAC/nuisance/nuisance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,8 +1281,9 @@ def create_regressor_workflow(nuisance_selectors,
output_names=['cosfiltered_img'],
function=cosine_filter,
imports=cosfilter_imports),
name='{}_cosine_filter'.format(regressor_type),
mem_gb=8.0)
name=f'{regressor_type}_cosine_filter',
mem_gb=8.0,
throttle=True)
nuisance_wf.connect(
summary_filter_input[0], summary_filter_input[1],
cosfilter_node, 'input_image_path'
Expand Down
42 changes: 33 additions & 9 deletions CPAC/pipeline/nipype_pipeline_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import re
from copy import deepcopy
from inspect import Parameter, Signature, signature
from typing import ClassVar, Optional, Union
from nibabel import load
from nipype.interfaces.utility import Function
from nipype.pipeline import engine as pe
Expand All @@ -69,6 +70,7 @@
from traits.trait_base import Undefined
from traits.trait_handlers import TraitListObject
from CPAC.utils.monitoring.custom_logging import getLogger
from CPAC.utils.typing import DICT

# set global default mem_gb
DEFAULT_MEM_GB = 2.0
Expand Down Expand Up @@ -153,7 +155,13 @@ class Node(pe.Node):
{" >>> realign.inputs.in_files = 'functional.nii'"}
)

def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
def __init__(
self,
*args,
mem_gb: Optional[float] = DEFAULT_MEM_GB,
throttle: Optional[bool] = False,
**kwargs
) -> None:
# pylint: disable=import-outside-toplevel
from CPAC.pipeline.random_state import random_seed
super().__init__(*args, mem_gb=mem_gb, **kwargs)
Expand All @@ -162,6 +170,8 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
self.seed_applied = False
self.input_data_shape = Undefined
self._debug = False
if throttle:
self.throttle = True
self.verbose_logger = None
self._mem_x = {}
if 'mem_x' in kwargs and isinstance(
Expand Down Expand Up @@ -195,7 +205,8 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
Parameter('mem_gb', Parameter.POSITIONAL_OR_KEYWORD,
default=DEFAULT_MEM_GB)
)[1] for p in orig_sig_params[:-1]] + [
Parameter('mem_x', Parameter.KEYWORD_ONLY),
Parameter('mem_x', Parameter.KEYWORD_ONLY, default=None),
Parameter("throttle", Parameter.KEYWORD_ONLY, default=False),
orig_sig_params[-1][1]
])

Expand Down Expand Up @@ -232,7 +243,11 @@ def __init__(self, *args, mem_gb=DEFAULT_MEM_GB, **kwargs):
``mode`` can be any one of
* 'xyzt' (spatial * temporal) (default if not specified)
* 'xyz' (spatial)
* 't' (temporal)'''])) # noqa: E501 # pylint: disable=line-too-long
* 't' (temporal)
throttle : bool, optional
Assume this Node will use all available memory if no observation run is
provided.'''])) # noqa: E501 # pylint: disable=line-too-long

def _add_flags(self, flags):
r'''
Expand Down Expand Up @@ -440,12 +455,21 @@ def __init__(self, *args, **kwargs):
if not self.name.endswith('_'):
self.name = f'{self.name}_'

__init__.__signature__ = Signature(parameters=[
p[1] if p[0] != 'mem_gb' else (
'mem_gb',
Parameter('mem_gb', Parameter.POSITIONAL_OR_KEYWORD,
default=DEFAULT_MEM_GB)
)[1] for p in signature(pe.Node).parameters.items()])
_parameters: ClassVar[DICT[str, Parameter]] = {}
_custom_params: ClassVar[DICT[str, Union[bool, float]]] = {
"mem_gb": DEFAULT_MEM_GB,
"throttle": False,
}
for param, default in _custom_params.items():
for p in signature(pe.Node).parameters.items():
if p[0] in _custom_params:
_parameters[p[0]] = Parameter(
param, Parameter.POSITIONAL_OR_KEYWORD, default=default
)
else:
_parameters[p[0]] = p[1]
__init__.__signature__ = Signature(parameters=list(_parameters.values()))
del _custom_params, _parameters


class Workflow(pe.Workflow):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from CPAC.utils.monitoring import log_nodes_cb


OVERHEAD_MEMORY_ESTIMATE: float = 1 # estimate of C-PAC + Nipype overhead (GB)


def get_peak_usage():
"""Function to return peak usage in GB.
Expand Down Expand Up @@ -189,17 +192,19 @@ def _prerun_check(self, graph):
tasks_num_th = []
overrun_message_mem = None
overrun_message_th = None
# estimate of C-PAC + Nipype overhead (GB):
overhead_memory_estimate = 1
for node in graph.nodes():
if hasattr(self, 'runtime'):
self._override_memory_estimate(node)
elif hasattr(node, "throttle"):
# for a throttled node without an observation run,
# assume all available memory will be needed
node._mem_gb = self.memory_gb - OVERHEAD_MEMORY_ESTIMATE
try:
node_memory_estimate = node.mem_gb
except FileNotFoundError:
# pylint: disable=protected-access
node_memory_estimate = node._apply_mem_x(UNDEFINED_SIZE)
node_memory_estimate += overhead_memory_estimate
node_memory_estimate += OVERHEAD_MEMORY_ESTIMATE
if node_memory_estimate > self.memory_gb:
tasks_mem_gb.append((node.name, node_memory_estimate))
if node.n_procs > self.processors:
Expand Down
52 changes: 51 additions & 1 deletion CPAC/pipeline/test/test_nipype_pipeline_engine.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
# Copyright (C) 2021-2024 C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Tests for C-PAC customizations to nipype.pipeline.engine."""
import os
import pytest
from nibabel.testing import data_path
from nipype import Function
from nipype.interfaces.utility import IdentityInterface
from nipype.utils.profiler import get_system_total_memory_gb
from traits.trait_base import Undefined

from CPAC.nuisance.utils.compcor import cosine_filter
from CPAC.pipeline.nipype_pipeline_engine import (
DEFAULT_MEM_GB, get_data_size, Node, MapNode, Workflow)
from CPAC.pipeline.nipype_pipeline_engine.plugins import (
LegacyMultiProcPlugin,
MultiProcPlugin,
)
from CPAC.utils.interfaces.function import Function


def get_sample_data(filepath):
Expand Down Expand Up @@ -35,6 +59,32 @@ def test_Node(): # pylint: disable=invalid-name
assert res.outputs.f_x == 4


@pytest.mark.parametrize("plugin", [LegacyMultiProcPlugin, MultiProcPlugin])
@pytest.mark.parametrize("throttle", [True, False])
def test_throttle(plugin: type, throttle: bool) -> None:
"""Test Node throttling."""
small_estimate = 0.0003
cosfilter_node = Node(
Function(
input_names=["input_image_path", "timestep"],
output_names=["cosfiltered_img"],
function=cosine_filter,
as_module=True,
),
name="aCompCor_cosine_filter",
mem_gb=small_estimate,
throttle=throttle,
)
wf = Workflow(name="aCompCor_cosine_filter_wf")
wf.add_nodes([cosfilter_node])
plugin(plugin_args={"raise_insufficient": False})._prerun_check(wf._graph)
if throttle:
assert cosfilter_node.mem_gb > small_estimate
assert cosfilter_node.mem_gb <= get_system_total_memory_gb()
else:
assert cosfilter_node.mem_gb == small_estimate


def test_Workflow(tmpdir): # pylint: disable=invalid-name
example_filepath = os.path.join(data_path, 'example4d.nii.gz')
pass_in_filepath = IdentityInterface(fields=['x'])
Expand Down
3 changes: 2 additions & 1 deletion CPAC/qc/xcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def _connect_motion(wf, nodes, strat_pool, qc_file, pipe_num):
name=f'cal_DVARS_{pipe_num}',
mem_gb=0.4,
mem_x=(739971956005215 / 151115727451828646838272,
'in_file'))
'in_file'),
throttle=True)
cal_DVARS_strip = pe.Node(Function(input_names=['file_1D'],
output_names=['out_file'],
function=DVARS_strip_t0,
Expand Down
3 changes: 2 additions & 1 deletion CPAC/registration/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,8 @@ def create_wf_calculate_ants_warp(
imports=reg_imports),
name='calc_ants_warp',
mem_gb=2.8,
mem_x=(2e-7, 'moving_brain', 'xyz'))
mem_x=(2e-7, 'moving_brain', 'xyz'),
throttle=True)

calculate_ants_warp.interface.num_threads = num_threads

Expand Down
41 changes: 25 additions & 16 deletions CPAC/utils/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,48 @@
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""
Helpers and aliases for handling typing in main and variant Python versions
Helpers and aliases for handling typing in main and variant Python versions.
Once all variants (see {DOCS_URL_PREFIX}/user/versions#variants)
run Python ≥ 3.10, these global variables can be replaced with the
current preferred syntax.
"""
from pathlib import Path
import sys
from typing import Union

from CPAC.utils.docs import DOCS_URL_PREFIX

# Set the version-specific documentation URL in the module docstring:
__doc__ = __doc__.replace(r'{DOCS_URL_PREFIX}', DOCS_URL_PREFIX)
__doc__ = __doc__.replace(r"{DOCS_URL_PREFIX}", DOCS_URL_PREFIX) # noqa: A001

if sys.version_info >= (3, 8):
from typing import Literal
LITERAL = Literal
from typing import Literal as LITERAL
else:
from typing_extensions import Literal
LITERAL = Literal
from typing_extensions import Literal as LITERAL
if sys.version_info >= (3, 9):
from collections.abc import Iterable
LIST = list
from builtins import dict as DICT, list as LIST
from collections.abc import Iterable as ITERABLE
else:
from typing import Iterable, List
LIST = List
from typing import Dict as DICT, Iterable as ITERABLE, List as LIST
if sys.version_info >= (3, 10):
from builtins import tuple as TUPLE

LIST_OR_STR = LIST[str] | str # pylint: disable=invalid-name
TUPLE = tuple
else:
from typing import Tuple
from typing import Tuple as TUPLE

LIST_OR_STR = Union[LIST[str], str] # pylint: disable=invalid-name
TUPLE = Tuple
ITERABLE = Iterable

PATHSTR = Union[Path, str]
ConfigKeyType = Union[str, LIST[str]]
__all__ = ['ConfigKeyType', 'ITERABLE', 'LIST', 'LIST_OR_STR', 'LITERAL',
'TUPLE']
__all__ = [
"ConfigKeyType",
"DICT",
"ITERABLE",
"LIST",
"LIST_OR_STR",
"LITERAL",
"PATHSTR",
"TUPLE",
]

0 comments on commit 39c2c70

Please sign in to comment.