Skip to content

Commit

Permalink
✅ Test #1989
Browse files Browse the repository at this point in the history
  • Loading branch information
shnizzedy committed Oct 11, 2023
1 parent d824df4 commit 5a95bf5
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 9 deletions.
Empty file.
190 changes: 190 additions & 0 deletions CPAC/func_preproc/tests/test_preproc_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright (C) 2023 C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Test graph connections for functional preprocessing"""
import re
from typing import Union

from nipype.interfaces.utility import Function as NipypeFunction, \
IdentityInterface
from nipype.pipeline.engine import Workflow as NipypeWorkflow

import pytest

from CPAC.func_preproc.func_motion import calc_motion_stats, \
func_motion_correct, func_motion_correct_only, func_motion_estimates, \
get_motion_ref, motion_estimate_filter
from CPAC.func_preproc.func_preproc import func_normalize
from CPAC.nuisance.nuisance import choose_nuisance_blocks
from CPAC.pipeline.cpac_pipeline import connect_pipeline
from CPAC.pipeline.engine import NodeBlock, ResourcePool
from CPAC.pipeline.nipype_pipeline_engine import Node, Workflow
from CPAC.registration.registration import coregistration_prep_fmriprep, \
coregistration_prep_mean, coregistration_prep_vol
from CPAC.utils.configuration import Configuration
from CPAC.utils.interfaces.function import Function as CpacFunction
from CPAC.utils.test_init import create_dummy_node
from CPAC.utils.typing import LIST


_FILTERS = [{'filter_type': 'notch', 'filter_order': 4,
'center_frequency': 0.31, 'filter_bandwidth': 0.12},
{'filter_type': 'lowpass', 'filter_order': 4,
'lowpass_cutoff': .0032}]
_PRE_RESOURCES = ['desc-preproc_bold',
'dvars',
'framewise-displacement-jenkinson',
'framewise-displacement-power',
'label-CSF_desc-eroded_mask',
'label-CSF_desc-preproc_mask',
'label-CSF_mask',
'label-GM_desc-eroded_mask',
'label-GM_desc-preproc_mask',
'label-GM_mask',
'label-WM_desc-eroded_mask',
'label-WM_desc-preproc_mask',
'label-WM_mask',
'lateral-ventricles-mask',
'space-T1w_desc-brain_mask',
'space-T1w_desc-eroded_mask',
'space-bold_desc-brain_mask',
'TR',
'desc-brain_T1w',
'from-T1w_to-template_mode-image_desc-linear_xfm',
'from-bold_to-T1w_mode-image_desc-linear_xfm',
'from-template_to-T1w_mode-image_desc-linear_xfm']


def _filter_assertion_message(subwf: NipypeWorkflow, is_filtered: bool,
should_be_filtered: bool) -> str:
if is_filtered and not should_be_filtered:
return (
f'{subwf.name} is filtered by '
f'{" & ".join([node.name for node in is_filtered])} and should '
'not be')
return f'{subwf.name} is not filtered and should be'


@pytest.mark.parametrize('run', [True, False, [True, False]])
@pytest.mark.parametrize('filters', [[_FILTERS[0]], [_FILTERS[1]], _FILTERS])
@pytest.mark.parametrize('regtool', ['ANTs', 'FSL'])
def test_motion_filter_connections(run: Union[bool, LIST[bool]],
filters: LIST[dict], regtool: LIST[str]
) -> None:
"""Test that appropriate connections occur vis-à-vis motion filters"""
# paramaterized Configuration
c = Configuration({
'functional_preproc': {
'motion_estimates_and_correction': {
'motion_estimate_filter': {
'run': run,
'filters': filters},
'run': True}},
'nuisance_corrections': {
'2-nuisance_regression': {
'Regressors': [{
'Name': 'aCompCor, GSR, no censor',
'Motion': {'include_delayed': True,
'include_squared': True,
'include_delayed_squared': True},
'aCompCor': {'summary': {'method': 'DetrendPC',
'components': 5},
'tissues': ['WhiteMatter',
'CerebrospinalFluid'],
'extraction_resolution': 3},
'GlobalSignal': {'summary': 'Mean'},
'PolyOrt': {'degree': 2},
'Bandpass': {'bottom_frequency': 0.01,
'top_frequency': 0.1}}]}}})
# resource for intial inputs
before_this_test = create_dummy_node('created_before_this_test',
_PRE_RESOURCES)
rpool = ResourcePool(cfg=c)
for resource in _PRE_RESOURCES:
if resource.endswith('xfm'):
rpool.set_data(resource, before_this_test, resource, {}, "",
f"created_before_this_test_{regtool}")
else:
rpool.set_data(resource, before_this_test, resource, {}, "",
"created_before_this_test")
pipeline_blocks = []
func_init_blocks = []
func_motion_blocks = []
func_preproc_blocks = []
func_mask_blocks = []
func_prep_blocks = [
calc_motion_stats,
func_normalize,
[coregistration_prep_vol,
coregistration_prep_mean,
coregistration_prep_fmriprep]
]

# Motion Correction
func_motion_blocks = []
if c['functional_preproc', 'motion_estimates_and_correction',
'motion_estimates', 'calculate_motion_first']:
func_motion_blocks = [
get_motion_ref,
func_motion_estimates,
motion_estimate_filter
]
else:
func_motion_blocks = [
get_motion_ref,
func_motion_correct,
motion_estimate_filter
]
if not rpool.check_rpool('movement-parameters'):
if c['functional_preproc', 'motion_estimates_and_correction',
'motion_estimates', 'calculate_motion_first']:
func_blocks = func_init_blocks + func_motion_blocks + \
func_preproc_blocks + [func_motion_correct_only] + \
func_mask_blocks + func_prep_blocks
else:
func_blocks = func_init_blocks + func_preproc_blocks + \
func_motion_blocks + func_mask_blocks + \
func_prep_blocks
else:
func_blocks = func_init_blocks + func_preproc_blocks + \
func_motion_blocks + func_mask_blocks + \
func_prep_blocks
pipeline_blocks += func_blocks
# Nuisance Correction
generate_only = True not in c['nuisance_corrections',
'2-nuisance_regression', 'run']
if not rpool.check_rpool('desc-cleaned_bold'):
pipeline_blocks += choose_nuisance_blocks(c, generate_only)
wf = Workflow(re.sub(r'[\[\]\-\:\_ \'\",]', '', str(rpool)))
connect_pipeline(wf, c, rpool, pipeline_blocks)
regressor_subwfs = [wf.get_node(nodename[:-26]) for nodename in
wf.list_node_names() if
nodename.endswith('build_nuisance_regressors')]
for subwf in regressor_subwfs:
# a motion filter is an input to the nuisance regressor subworkflow
is_filtered = []
# a motion filter should be an input to the regressor subworkflow
should_be_filtered = ('_filt-' in subwf.name and '_filt-none' not in
subwf.name)
for u, v in wf._graph.edges: # pylint: disable=invalid-name,protected-access
if (v == subwf and
isinstance(u.interface, (NipypeFunction, CpacFunction)) and
'notch_filter_motion' in u.interface.inputs.function_str
):
is_filtered.append(u)
assert bool(is_filtered
) == should_be_filtered, _filter_assertion_message(
subwf, is_filtered, should_be_filtered)
2 changes: 1 addition & 1 deletion CPAC/nuisance/nuisance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2334,7 +2334,7 @@ def nuisance_regressors_generation(wf: Workflow, cfg: Configuration,
'motion_estimates_and_correction',
'motion_estimate_filter', 'run']):
wf_name = (f'nuisance_regressors_{opt["Name"]}_filt-'
f'{strat_pool.filter_name}_{pipe_num}')
f'{strat_pool.filter_name(cfg)}_{pipe_num}')
else:
wf_name = f'nuisance_regressors_{opt["Name"]}_{pipe_num}'

Expand Down
13 changes: 10 additions & 3 deletions CPAC/pipeline/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def get_json(self, resource, strat=None):
else:
raise Exception('\n[!] Developer info: the JSON '
f'information for {resource} and {strat} '
f'is incomplete.\n')
f'is incomplete.\n')
return strat_json

def get_cpac_provenance(self, resource, strat=None):
Expand Down Expand Up @@ -762,8 +762,7 @@ def filtered_movement(self) -> bool:
# not a strat_pool or no movement parameters in strat_pool
return False

@property
def filter_name(self) -> str:
def filter_name(self, cfg) -> str:
"""
In a strat_pool with filtered movement parameters, return the
name of the filter for this strategy
Expand All @@ -772,6 +771,14 @@ def filter_name(self) -> str:
-------
str
"""
motion_filters = cfg['functional_preproc',
'motion_estimates_and_correction',
'motion_estimate_filter', 'filters']
if len(motion_filters) == 1 and cfg.switch_is_on([
'functional_preproc', 'motion_estimates_and_correction',
'motion_estimate_filter', 'run'], exclusive=True
):
return motion_filters[0]['Name']
key = 'movement-parameters'
try:
sidecar = self.get_json(key)
Expand Down
21 changes: 18 additions & 3 deletions CPAC/utils/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def __init__(self, config_map=None):
# set Regressor 'Name's if not provided
if 'Name' not in regressor:
regressor['Name'] = f'Regressor-{str(i + 1)}'
# replace spaces with hyphens in Regressor 'Name's
regressor['Name'] = regressor['Name'].replace(' ', '-')
# make Regressor 'Name's Nipype-friendly
regressor['Name'] = nipype_friendly_name(regressor['Name'])

config_map = schema(config_map)

Expand Down Expand Up @@ -446,7 +446,7 @@ def switch_is_off(self, key: ConfigKeyType, exclusive: bool = False

def switch_is_on(self, key: ConfigKeyType, exclusive: bool = False
) -> bool:
'''Return True if the given key is set to both 'on' and 'off'
'''Return True if the given key is set to 'on' OR 'on' and 'off'
or False otherwise. Used for tracking forking.
Parameters
Expand Down Expand Up @@ -731,3 +731,18 @@ def set_subject(sub_dict: dict, pipe_config: 'Configuration',
if not os.path.exists(log_dir):
os.makedirs(os.path.join(log_dir))
return subject_id, p_name, log_dir


def nipype_friendly_name(name: str) -> str:
"""Replace sequences of non-alphanumeric characters with a single
underscore and remove any leading underscores
Parameters
----------
name : str
Returns
-------
str
"""
return re.sub(r'[^a-zA-Z0-9]+', '_', name).lstrip('_')
35 changes: 33 additions & 2 deletions CPAC/utils/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
#
# Contributing authors (please append):
# Daniel Clark

# Jon Clucas
'''
This module contains functions that assist in initializing CPAC
tests resources
'''
from typing import Optional

from nipype.interfaces.utility import IdentityInterface

from CPAC.pipeline.nipype_pipeline_engine import Node
from CPAC.utils.typing import LIST


def create_dummy_node(name: str, fields: Optional[LIST[str]] = None):
"""
Create a dummy IdentityInterface Node source for resources upstream
in a graph from a section to be tested
Parameters
----------
name : str
a name for the dummy Node
fields : list of str, optional
a list of resources to be present in the created Node. If not
provided, the only resource will be called 'resource'
Returns
-------
Node
"""
if fields is None:
fields = ['resource']
return Node(IdentityInterface(fields=fields), name=name)


# Return tests data config file
def populate_template_config(config_type):
def populate_template_config(config_type: str) -> str:
'''
Function to read in a template config file from the
CPAC_RESOURCE_DIR and populate it with actual filepaths
Expand Down

0 comments on commit 5a95bf5

Please sign in to comment.