Skip to content

Commit

Permalink
Merge pull request nipreps#55 from oesteban/rf/too-bidsy
Browse files Browse the repository at this point in the history
ENH: Large refactor of the orchestration workflow
  • Loading branch information
oesteban authored Nov 20, 2019
2 parents 83e2dbf + 279fce1 commit 366e4ce
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 57 deletions.
77 changes: 41 additions & 36 deletions sdcflows/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
method (for reporting purposes)
"""
if ignore is None:
ignore = tuple()

if not isinstance(ignore, (list, tuple)):
ignore = tuple(ignore)

# TODO: To be removed (filter out unsupported fieldmaps):
fmaps = [fmap for fmap in fmaps if fmap['suffix'] in FMAP_PRIORITY]

workflow = Workflow(name='sdc_estimate_wf' if fmaps else 'sdc_bypass_wf')
inputnode = pe.Node(niu.IdentityInterface(
fields=['epi_file', 'epi_brain', 'epi_mask', 't1w_brain', 'std2anat_xfm']),
Expand All @@ -99,7 +90,8 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
name='outputnode')

# No fieldmaps - forward inputs to outputs
if not fmaps or 'fieldmaps' in ignore:
ignored = False if ignore is None else 'fieldmaps' in ignore
if not fmaps or ignored:
workflow.__postdesc__ = """\
Susceptibility distortion correction (SDC) has been skipped because the
dataset does not contain extra field map acquisitions correctly described
Expand All @@ -119,18 +111,26 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
accurate co-registration with the anatomical reference.
"""

# In case there are multiple fieldmaps prefer EPI
fmaps.sort(key=lambda fmap: FMAP_PRIORITY[fmap['suffix']])
fmap = fmaps[0]
only_syn = 'syn' in fmaps and len(fmaps) == 1

# PEPOLAR path
if 'epi' in fmaps:
from .pepolar import init_pepolar_unwarp_wf, check_pes

# SyN works without this metadata
if epi_meta.get('PhaseEncodingDirection') is None:
raise ValueError(
'PhaseEncodingDirection is not defined within the metadata retrieved '
'for the intended EPI (DWI, BOLD, or SBRef) run.')
outputnode.inputs.method = 'PEB/PEPOLAR (phase-encoding based / PE-POLARity)'

# Filter out EPI fieldmaps to be used
fmaps_epi = [(epi.path, epi.get_metadata()['PhaseEncodingDirection'])
for epi in fmaps['epi']]
fmaps_epi = [(v[0], v[1].get('PhaseEncodingDirection'))
for v in fmaps['epi']]

if not all(list(zip(*fmaps_epi))[1]):
raise ValueError(
'At least one of the EPI runs with alternative phase-encoding '
'blips is missing the required "PhaseEncodingDirection" metadata entry.')

# Find matched PE directions
matched_pe = check_pes(fmaps_epi, epi_meta['PhaseEncodingDirection'])
Expand All @@ -150,32 +150,34 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
])

# FIELDMAP path
elif 'fieldmap' in fmaps:
elif 'fieldmap' in fmaps or 'phasediff' in fmaps:
from .unwarp import init_sdc_unwarp_wf
# Import specific workflows here, so we don't break everything with one
# unused workflow.
suffices = {f.suffix for f in fmaps['fieldmap']}
if 'fieldmap' in suffices:

# SyN works without this metadata
if epi_meta.get('PhaseEncodingDirection') is None:
raise ValueError(
'PhaseEncodingDirection is not defined within the metadata retrieved '
'for the intended EPI (DWI, BOLD, or SBRef) run.')

if 'fieldmap' in fmaps:
from .fmap import init_fmap_wf
outputnode.inputs.method = 'FMB (fieldmap-based)'
outputnode.inputs.method = 'FMB (fieldmap-based) - directly measured B0 map'
fmap_wf = init_fmap_wf(
omp_nthreads=omp_nthreads,
fmap_bspline=False)
# set inputs
fmap_wf.inputs.inputnode.magnitude = fmap['magnitude']
fmap_wf.inputs.inputnode.fieldmap = fmap['fieldmap']
elif 'phasediff' in suffices:
fmap_wf.inputs.inputnode.magnitude = [
m for m, _ in fmaps['fieldmap']['magnitude']]
fmap_wf.inputs.inputnode.fieldmap = [
m for m, _ in fmaps['fieldmap']['fieldmap']]
elif 'phasediff' in fmaps:
from .phdiff import init_phdiff_wf
outputnode.inputs.method = 'FMB (fieldmap-based) - phase-difference map'
fmap_wf = init_phdiff_wf(omp_nthreads=omp_nthreads)
# set inputs
fmap_wf.inputs.inputnode.phasediff = fmap['phasediff']
fmap_wf.inputs.inputnode.magnitude = [
fmap_ for key, fmap_ in sorted(fmap.items())
if key.startswith("magnitude")
]
else:
raise ValueError('Fieldmaps of types %s are not supported' %
', '.join(['"%s"' % f for f in suffices]))
m for m, _ in fmaps['phasediff']['magnitude']]
fmap_wf.inputs.inputnode.phasediff = fmaps['phasediff']['phases']

sdc_unwarp_wf = init_sdc_unwarp_wf(
omp_nthreads=omp_nthreads,
Expand All @@ -193,24 +195,27 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
('outputnode.fmap_ref', 'inputnode.fmap_ref'),
('outputnode.fmap_mask', 'inputnode.fmap_mask')]),
])
elif not only_syn:
raise ValueError('Fieldmaps of types %s are not supported' %
', '.join(['"%s"' % f for f in fmaps]))

# FIELDMAP-less path
if any(fm['suffix'] == 'syn' for fm in fmaps):
if 'syn' in fmaps:
from .syn import init_syn_sdc_wf
syn_sdc_wf = init_syn_sdc_wf(
epi_pe=epi_meta.get('PhaseEncodingDirection', None),
omp_nthreads=omp_nthreads)

workflow.connect([
(inputnode, syn_sdc_wf, [
('epi_file', 'inputnode.in_reference'),
('epi_brain', 'inputnode.in_reference_brain'),
('t1w_brain', 'inputnode.t1w_brain'),
('epi_file', 'inputnode.epi_file'),
('epi_brain', 'inputnode.epi_brain'),
('std2anat_xfm', 'inputnode.std2anat_xfm')]),
])

# XXX Eliminate branch when forcing isn't an option
if fmap['suffix'] == 'syn': # No fieldmaps, but --use-syn
if only_syn: # No fieldmaps, but --use-syn
outputnode.inputs.method = 'FLB ("fieldmap-less", SyN-based)'
sdc_unwarp_wf = syn_sdc_wf
else: # --force-syn was called when other fieldmap was present
Expand Down
13 changes: 10 additions & 3 deletions sdcflows/workflows/gre.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,
"""
workflow = Workflow(name=name)
inputnode = pe.Node(niu.IdentityInterface(
fields=['fmap_mask', 'fmap_ref', 'fmap']), name='inputnode')
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap']),
fields=['fmap_mask', 'fmap_ref', 'fmap', 'metadata']), name='inputnode')
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap', 'metadata']),
name='outputnode')
if fmap_bspline:
from ..interfaces.fmap import FieldEnhance
Expand All @@ -156,11 +156,12 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,

workflow.connect([
(inputnode, cleanup_wf, [('fmap_mask', 'inputnode.in_mask')]),
(inputnode, recenter, [('fmap', 'in_file')]),
(inputnode, recenter, [(('fmap', _pop), 'in_file')]),
(recenter, denoise, [('out', 'in_file')]),
(denoise, demean, [('out_file', 'in_file')]),
(demean, cleanup_wf, [('out', 'inputnode.in_file')]),
(cleanup_wf, outputnode, [('outputnode.out_file', 'out_fmap')]),
(inputnode, outputnode, [(('metadata', _pop), 'metadata')]),
])

return workflow
Expand Down Expand Up @@ -218,3 +219,9 @@ def _demean(in_file, in_mask=None, usemode=True):
newpath=getcwd())
nb.Nifti1Image(data, nii.affine, nii.header).to_filename(out_file)
return out_file


def _pop(inlist):
if isinstance(inlist, (tuple, list)):
return inlist[0]
return inlist
35 changes: 22 additions & 13 deletions sdcflows/workflows/phdiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
Inputs
------
magnitude : pathlike
Path to the corresponding magnitude path(s).
phasediff : pathlike
Path to the corresponding phase-difference file.
metadata : dict
Metadata dictionary corresponding to the phasediff input
magnitude : list of os.pathlike
List of path(s) the GRE magnitude maps.
phasediff : list of tuple(os.pathlike, dict)
List containing one GRE phase-difference map with its corresponding metadata
(requires ``EchoTime1`` and ``EchoTime2``), or the phase maps for the two
subsequent echoes, with their metadata (requires ``EchoTime``).
Outputs
-------
Expand All @@ -90,39 +90,48 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
further improvements of HCP Pipelines [@hcppipelines].
"""

inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff', 'metadata']),
inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff']),
name='inputnode')

outputnode = pe.Node(niu.IdentityInterface(
fields=['fmap', 'fmap_ref', 'fmap_mask']), name='outputnode')

split = pe.MapNode(niu.Function(function=_split, output_names=['map_file', 'meta']),
iterfield=['phasediff'], run_without_submitting=True, name='split')

magnitude_wf = init_magnitude_wf(omp_nthreads=omp_nthreads)

# phase diff -> radians
phmap2rads = pe.Node(PhaseMap2rads(), name='phmap2rads',
run_without_submitting=True)
phmap2rads = pe.MapNode(PhaseMap2rads(), name='phmap2rads',
iterfield=['in_file'], run_without_submitting=True)
# FSL PRELUDE will perform phase-unwrapping
prelude = pe.Node(fsl.PRELUDE(), name='prelude')
prelude = pe.MapNode(fsl.PRELUDE(), iterfield=['phase_file'], name='prelude')

fmap_postproc_wf = init_fmap_postproc_wf(omp_nthreads=omp_nthreads,
fmap_bspline=False)
compfmap = pe.Node(Phasediff2Fieldmap(), name='compfmap')

workflow.connect([
(inputnode, compfmap, [('metadata', 'metadata')]),
(inputnode, split, [('phasediff', 'phasediff')]),
(inputnode, magnitude_wf, [('magnitude', 'inputnode.magnitude')]),
(magnitude_wf, prelude, [('outputnode.fmap_ref', 'magnitude_file'),
('outputnode.fmap_mask', 'mask_file')]),
(inputnode, phmap2rads, [('phasediff', 'in_file')]),
(split, phmap2rads, [('map_file', 'in_file')]),
(phmap2rads, prelude, [('out_file', 'phase_file')]),
(split, fmap_postproc_wf, [('meta', 'inputnode.metadata')]),
(prelude, fmap_postproc_wf, [('unwrapped_phase_file', 'inputnode.fmap')]),
(magnitude_wf, fmap_postproc_wf, [
('outputnode.fmap_mask', 'inputnode.fmap_mask'),
('outputnode.fmap_ref', 'inputnode.fmap_ref')]),
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file')]),
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file'),
('outputnode.metadata', 'metadata')]),
(compfmap, outputnode, [('out_file', 'fmap')]),
(magnitude_wf, outputnode, [('outputnode.fmap_ref', 'fmap_ref'),
('outputnode.fmap_mask', 'fmap_mask')]),
])

return workflow


def _split(phasediff):
return phasediff
140 changes: 140 additions & 0 deletions sdcflows/workflows/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Test base workflows."""
import pytest

from ..base import init_sdc_estimate_wf

EPI_METADATA = {
'MultibandAccelerationFactor': 8,
'PhaseEncodingDirection': 'i',
'RepetitionTime': 0.72,
'TaskName': 'Resting-state fMRI',
}
EPI_FMAP_METADATA_1 = {
'BandwidthPerPixelPhaseEncode': 2290,
'EPIFactor': 90,
'EffectiveEchoSpacing': 0.00058,
'IntendedFor': ['func/sub-HCP101006_task-rest_dir-LR_bold.nii.gz',
'func/sub-HCP101006_task-rest_dir-LR_sbref.nii.gz'],
'MultibandAccelerationFactor': 1,
'PhaseEncodingDirection': 'i-',
}
EPI_FMAP_METADATA_2 = EPI_FMAP_METADATA_1.copy()
EPI_FMAP_METADATA_2['PhaseEncodingDirection'] = 'i'

PHDIFF_METADATA = {
'EchoTime1': 0.00492,
'EchoTime2': 0.00738,
}
PHASE1_METADATA = {
'EchoTime': 0.00492,
}
PHASE2_METADATA = {
'EchoTime': 0.00738,
}


FMAP_DICT_ELEMENTS = {
'epi1': [('sub-HCP101006/fmap/sub-HCP101006_dir-RL_epi.nii.gz',
EPI_FMAP_METADATA_1.copy())],
'epi2': [('sub-HCP101006/fmap/sub-HCP101006_dir-RL_epi.nii.gz',
EPI_FMAP_METADATA_1.copy()),
('sub-HCP101006/fmap/sub-HCP101006_dir-LR_epi.nii.gz',
EPI_FMAP_METADATA_2.copy())],
'phdiff1': {
'magnitude': [('sub-HCP101006/fmap/sub-HCP101006_magnitude1.nii.gz', {}),
('sub-HCP101006/fmap/sub-HCP101006_magnitude2.nii.gz', {})],
'phases': [('sub-HCP101006/fmap/sub-HCP101006_phasediff.nii.gz', PHDIFF_METADATA)],
},
'phdiff2': {
'magnitude': [('sub-HCP101006/fmap/sub-HCP101006_magnitude1.nii.gz', {}),
('sub-HCP101006/fmap/sub-HCP101006_magnitude2.nii.gz', {})],
'phases': [('sub-HCP101006/fmap/sub-HCP101006_phase1.nii.gz',
PHASE1_METADATA.copy()),
('sub-HCP101006/fmap/sub-HCP101006_phase2.nii.gz',
PHASE2_METADATA.copy())],
},
'fmap1': {
'magnitude': [('sub-HCP101006/fmap/sub-HCP101006_magnitude.nii.gz', {})],
'fieldmap': [('sub-HCP101006/fmap/sub-HCP101006_fieldmap.nii.gz', {})],
},
'syn': {
't1w': [('sub-HCP101006/fmap/sub-HCP101006_T1w.nii.gz', {})]
}
}


@pytest.mark.parametrize('method', ['skip', 'phasediff', 'pepolar', 'fieldmap', 'syn'])
def test_base(method):
"""Check the heuristics are correctly applied."""
fieldmaps = {
'epi': FMAP_DICT_ELEMENTS['epi1'].copy(),
'fieldmap': FMAP_DICT_ELEMENTS['fmap1'].copy(),
'phasediff': FMAP_DICT_ELEMENTS['phdiff1'].copy(),
}
epi_meta = EPI_METADATA.copy()

if method == 'skip':
wf = init_sdc_estimate_wf(fmaps=[], epi_meta=epi_meta)
assert wf.inputs.outputnode.method == 'None'

wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta, ignore=('fieldmaps', ))
assert wf.inputs.outputnode.method == 'None'

with pytest.raises(ValueError):
wf = init_sdc_estimate_wf(fmaps={'unsupported': None}, epi_meta=epi_meta)
elif method == 'pepolar':
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
assert 'PEPOLAR' in wf.inputs.outputnode.method

fieldmaps['epi'][0][1].pop('PhaseEncodingDirection')
with pytest.raises(ValueError):
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)

fieldmaps['epi'][0][1]['PhaseEncodingDirection'] = \
EPI_FMAP_METADATA_1['PhaseEncodingDirection']
epi_meta.pop('PhaseEncodingDirection')
with pytest.raises(ValueError):
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
epi_meta = EPI_METADATA.copy()

fieldmaps['epi'] = FMAP_DICT_ELEMENTS['epi2']
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
assert 'PEPOLAR' in wf.inputs.outputnode.method

elif method == 'fieldmap':
fieldmaps = {
'fieldmap': FMAP_DICT_ELEMENTS['fmap1'].copy(),
'phasediff': FMAP_DICT_ELEMENTS['phdiff1'].copy(),
}
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
assert 'directly measured B0 map' in wf.inputs.outputnode.method

elif method == 'phasediff':
fieldmaps = {
'phasediff': FMAP_DICT_ELEMENTS['phdiff1'].copy(),
}

wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
assert 'phase-difference' in wf.inputs.outputnode.method

epi_meta.pop('PhaseEncodingDirection')
with pytest.raises(ValueError):
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)

elif method == 'syn':
fmaps_onlysyn = {'syn': FMAP_DICT_ELEMENTS['syn']}
wf = init_sdc_estimate_wf(fmaps=fmaps_onlysyn, epi_meta=epi_meta)
assert 'SyN' in wf.inputs.outputnode.method

epi_pe = epi_meta.pop('PhaseEncodingDirection')
wf = init_sdc_estimate_wf(fmaps=fmaps_onlysyn, epi_meta=epi_meta)
assert 'SyN' in wf.inputs.outputnode.method

# Emulate --force-syn
fieldmaps.update(fmaps_onlysyn)
with pytest.raises(ValueError):
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)

epi_meta['PhaseEncodingDirection'] = epi_pe
wf = init_sdc_estimate_wf(fmaps=fieldmaps, epi_meta=epi_meta)
assert 'PEPOLAR' in wf.inputs.outputnode.method
Loading

0 comments on commit 366e4ce

Please sign in to comment.