Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Connect fieldmap estimation to preprocessing pipeline of individual DWI runs #140

Merged
merged 9 commits into from
Dec 11, 2020
96 changes: 61 additions & 35 deletions dmriprep/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def init_single_subject_wf(subject_id):
return workflow

from .dwi.base import init_dwi_preproc_wf

# Append the dMRI section to the existing anatomical excerpt
# That way we do not need to stream down the number of DWI datasets
anat_preproc_wf.__postdesc__ = (
Expand All @@ -301,61 +302,86 @@ def init_single_subject_wf(subject_id):
"""
)

# SDC Step 0: Determine whether fieldmaps can/should be estimated
fmap_estimators = None
if "fieldmap" not in config.workflow.ignore:
from sdcflows import fieldmaps as fm
from sdcflows.utils.wrangler import find_estimators
from sdcflows.workflows.base import init_fmap_preproc_wf

# SDC Step 1: Run basic heuristics to identify available data for fieldmap estimation
fmap_estimators = find_estimators(config.execution.layout)

# Add fieldmap-less estimators
if not fmap_estimators and config.workflow.use_syn:
# estimators = [fm.FieldmapEstimation()]
raise NotImplementedError

# Nuts and bolts: initialize individual run's pipeline
dwi_preproc_list = []
for dwi_file in subject_data["dwi"]:
dwi_preproc_wf = init_dwi_preproc_wf(dwi_file)
dwi_preproc_wf = init_dwi_preproc_wf(
dwi_file,
has_fieldmap=bool(fmap_estimators),
)

# fmt: off
workflow.connect([
(anat_preproc_wf, dwi_preproc_wf,
[("outputnode.t1w_preproc", "inputnode.t1w_preproc"),
("outputnode.t1w_mask", "inputnode.t1w_mask"),
("outputnode.t1w_dseg", "inputnode.t1w_dseg"),
("outputnode.t1w_aseg", "inputnode.t1w_aseg"),
("outputnode.t1w_aparc", "inputnode.t1w_aparc"),
("outputnode.t1w_tpms", "inputnode.t1w_tpms"),
("outputnode.template", "inputnode.template"),
("outputnode.anat2std_xfm", "inputnode.anat2std_xfm"),
("outputnode.std2anat_xfm", "inputnode.std2anat_xfm"),
# Undefined if --fs-no-reconall, but this is safe
("outputnode.subjects_dir", "inputnode.subjects_dir"),
("outputnode.t1w2fsnative_xfm", "inputnode.t1w2fsnative_xfm"),
("outputnode.fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm")]),
(anat_preproc_wf, dwi_preproc_wf, [
("outputnode.t1w_preproc", "inputnode.t1w_preproc"),
("outputnode.t1w_mask", "inputnode.t1w_mask"),
("outputnode.t1w_dseg", "inputnode.t1w_dseg"),
("outputnode.t1w_aseg", "inputnode.t1w_aseg"),
("outputnode.t1w_aparc", "inputnode.t1w_aparc"),
("outputnode.t1w_tpms", "inputnode.t1w_tpms"),
("outputnode.template", "inputnode.template"),
("outputnode.anat2std_xfm", "inputnode.anat2std_xfm"),
("outputnode.std2anat_xfm", "inputnode.std2anat_xfm"),
# Undefined if --fs-no-reconall, but this is safe
("outputnode.subjects_dir", "inputnode.subjects_dir"),
("outputnode.t1w2fsnative_xfm", "inputnode.t1w2fsnative_xfm"),
("outputnode.fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm"),
]),
(bids_info, dwi_preproc_wf, [("subject", "inputnode.subject_id")]),
])
# fmt: on

if "fieldmap" in config.workflow.ignore:
return workflow

from sdcflows import fieldmaps as fm
from sdcflows.utils.wrangler import find_estimators
from sdcflows.workflows.base import init_fmap_preproc_wf
# Keep a handle to each workflow
dwi_preproc_list.append(dwi_preproc_wf)

# SDCFlows connection
# Step 1: Run basic heuristics to identify available data for fieldmap estimation
estimators = find_estimators(config.execution.layout)

if not estimators and config.workflow.use_syn: # Add fieldmap-less estimators
# estimators = [fm.FieldmapEstimation()]
raise NotImplementedError
if not fmap_estimators:
return workflow

# Step 2: Manually add further estimators (e.g., fieldmap-less)
# SDC Step 2: Manually add further estimators (e.g., fieldmap-less)
fmap_wf = init_fmap_preproc_wf(
debug=config.execution.debug,
estimators=estimators,
estimators=fmap_estimators,
omp_nthreads=config.nipype.omp_nthreads,
output_dir=str(output_dir),
subject=subject_id,
)

# TODO: Requires nipreps/sdcflows#147
for dwi_preproc_wf in dwi_preproc_list:
# fmt: off
workflow.connect([
(fmap_wf, dwi_preproc_wf, [
("outputnode.fmap", "inputnode.fmap"),
("outputnode.fmap_ref", "inputnode.fmap_ref"),
("outputnode.fmap_coeff", "inputnode.fmap_coeff"),
("outputnode.fmap_mask", "inputnode.fmap_mask"),
("outputnode.fmap_id", "inputnode.fmap_id"),
]),
])
# fmt: on

# Overwrite ``out_path_base`` of sdcflows's DataSinks
for node in fmap_wf.list_node_names():
if node.split(".")[-1].startswith("ds_"):
fmap_wf.get_node(node).interface.out_path_base = "dmriprep"
workflow.add_nodes([fmap_wf])

# Step 3: Manually connect PEPOLAR
for estimator in estimators:
for estimator in fmap_estimators:
if estimator.method != fm.EstimatorType.PEPOLAR:
continue

Expand All @@ -372,18 +398,18 @@ def init_single_subject_wf(subject_id):
raise NotImplementedError
# from niworkflows.interfaces.utility import KeySelect
# est_id = estimator.bids_id
# fmap_select = pe.MapNode(
# estim_select = pe.MapNode(
# KeySelect(fields=["metadata", "dwi_reference", "dwi_mask", "gradients_rasb",]),
# name=f"fmap_select_{est_id}",
# run_without_submitting=True,
# iterfields=["key"]
# )
# fmap_select.inputs.key = [
# estim_select.inputs.key = [
# str(s.path) for s in estimator.sources if s.suffix in ("epi", "dwi", "sbref")
# ]
# # fmt:off
# workflow.connect([
# (referencenode, fmap_select, [("dwi_file", "keys"),
# (referencenode, estim_select, [("dwi_file", "keys"),
# ("metadata", "metadata"),
# ("dwi_reference", "dwi_reference"),
# ("gradients_rasb", "gradients_rasb")]),
Expand Down
108 changes: 93 additions & 15 deletions dmriprep/workflows/dwi/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ...interfaces import DerivativesDataSink


def init_dwi_preproc_wf(dwi_file):
def init_dwi_preproc_wf(dwi_file, has_fieldmap=False):
"""
Build a preprocessing workflow for one DWI run.

Expand All @@ -30,6 +30,8 @@ def init_dwi_preproc_wf(dwi_file):
----------
dwi_file : :obj:`os.PathLike`
One diffusion MRI dataset to be processed.
has_fieldmap : :obj:`bool`
Build the workflow with a path to register a fieldmap to the DWI.

Inputs
------
Expand All @@ -39,6 +41,17 @@ def init_dwi_preproc_wf(dwi_file):
File path of the b-vectors
in_bval
File path of the b-values
fmap
File path of the fieldmap
fmap_ref
File path of the fieldmap reference
fmap_coeff
josephmje marked this conversation as resolved.
Show resolved Hide resolved
File path of the fieldmap coefficients
fmap_mask
File path of the fieldmap mask
josephmje marked this conversation as resolved.
Show resolved Hide resolved
fmap_id
The BIDS modality label of the fieldmap being used


Outputs
-------
Expand Down Expand Up @@ -67,6 +80,16 @@ def init_dwi_preproc_wf(dwi_file):
f"Creating DWI preprocessing workflow for <{dwi_file.name}>"
)

if has_fieldmap:
from sdcflows.fieldmaps import get_identifier

estimator_key = get_identifier(dwi_file)
if not estimator_key:
has_fieldmap = False
config.loggers.workflow.critical(
f"None of the available B0 fieldmaps are associated to <{dwi_file}>"
)

# Build workflow
workflow = Workflow(name=_get_wf_name(dwi_file.name))

Expand All @@ -77,6 +100,12 @@ def init_dwi_preproc_wf(dwi_file):
"dwi_file",
"in_bvec",
"in_bval",
# From SDCFlows
"fmap",
"fmap_ref",
"fmap_coeff",
"fmap_mask",
josephmje marked this conversation as resolved.
Show resolved Hide resolved
"fmap_id",
# From anatomical
"t1w_preproc",
"t1w_mask",
Expand Down Expand Up @@ -111,20 +140,16 @@ def init_dwi_preproc_wf(dwi_file):
)

# MAIN WORKFLOW STRUCTURE
# fmt:off
# fmt: off
workflow.connect([
(inputnode, gradient_table, [("dwi_file", "dwi_file"),
("in_bvec", "in_bvec"),
("in_bval", "in_bval")]),
(inputnode, dwi_reference_wf, [("dwi_file", "inputnode.dwi_file")]),
(gradient_table, dwi_reference_wf, [("b0_ixs", "inputnode.b0_ixs")]),
(dwi_reference_wf, outputnode, [
("outputnode.ref_image", "dwi_reference"),
("outputnode.dwi_mask", "dwi_mask"),
]),
(gradient_table, outputnode, [("out_rasb", "gradients_rasb")]),
])
# fmt:on
# fmt: on

if config.workflow.run_reconall:
from niworkflows.interfaces.nibabel import ApplyMask
Expand All @@ -142,8 +167,7 @@ def init_dwi_preproc_wf(dwi_file):

ds_report_reg = pe.Node(
DerivativesDataSink(
base_directory=str(config.execution.output_dir),
datatype="figures",
base_directory=str(config.execution.output_dir), datatype="figures",
),
name="ds_report_reg",
run_without_submitting=True,
Expand All @@ -152,7 +176,7 @@ def init_dwi_preproc_wf(dwi_file):
def _bold_reg_suffix(fallback):
return "coreg" if fallback else "bbregister"

# fmt:off
# fmt: off
workflow.connect([
(inputnode, bbr_wf, [
("fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm"),
Expand All @@ -171,20 +195,74 @@ def _bold_reg_suffix(fallback):
('outputnode.out_report', 'in_file'),
(('outputnode.fallback', _bold_reg_suffix), 'desc')]),
])
# fmt:on
# fmt: on

# REPORTING ############################################################
reportlets_wf = init_reportlets_wf(str(config.execution.output_dir))
# fmt:off
# fmt: off
workflow.connect([
(inputnode, reportlets_wf, [("dwi_file", "inputnode.source_file")]),
(dwi_reference_wf, reportlets_wf, [
("outputnode.ref_image", "inputnode.dwi_ref"),
("outputnode.dwi_mask", "inputnode.dwi_mask"),
("outputnode.validation_report", "inputnode.validation_report"),
]),
(outputnode, reportlets_wf, [
("dwi_reference", "inputnode.dwi_ref"),
("dwi_mask", "inputnode.dwi_mask"),
]),
])
# fmt: on

if not has_fieldmap:
# fmt: off
workflow.connect([
(dwi_reference_wf, outputnode, [("outputnode.ref_image", "dwi_reference"),
("outputnode.dwi_mask", "dwi_mask")]),
])
# fmt: on
return workflow

from niworkflows.interfaces.utility import KeySelect
from sdcflows.workflows.apply.registration import init_coeff2epi_wf
from sdcflows.workflows.apply.correction import init_unwarp_wf

coeff2epi_wf = init_coeff2epi_wf(
omp_nthreads=config.nipype.omp_nthreads, write_coeff=True
)
unwarp_wf = init_unwarp_wf(omp_nthreads=config.nipype.omp_nthreads)
unwarp_wf.inputs.inputnode.metadata = layout.get_metadata(str(dwi_file))

output_select = pe.Node(
KeySelect(fields=["fmap", "fmap_ref", "fmap_coeff", "fmap_mask"]),
name="output_select",
run_without_submitting=True,
)
output_select.inputs.key = estimator_key[0]
if len(estimator_key) > 1:
config.loggers.workflow.warning(
f"Several fieldmaps <{', '.join(estimator_key)}> are "
f"'IntendedFor' <{dwi_file}>, using {estimator_key[0]}"
)

# fmt: off
workflow.connect([
(inputnode, output_select, [("fmap", "fmap"),
("fmap_ref", "fmap_ref"),
("fmap_coeff", "fmap_coeff"),
("fmap_mask", "fmap_mask"),
("fmap_id", "keys")]),
(output_select, coeff2epi_wf, [
("fmap_ref", "inputnode.fmap_ref"),
("fmap_coeff", "inputnode.fmap_coeff"),
("fmap_mask", "inputnode.fmap_mask")]),
(dwi_reference_wf, coeff2epi_wf, [
("outputnode.ref_image", "inputnode.target_ref"),
("outputnode.dwi_mask", "inputnode.target_mask")]),
(dwi_reference_wf, unwarp_wf, [("outputnode.ref_image", "inputnode.distorted")]),
(coeff2epi_wf, unwarp_wf, [
("outputnode.fmap_coeff", "inputnode.fmap_coeff")]),
(unwarp_wf, outputnode, [("outputnode.corrected", "dwi_reference")]),
])
# fmt:on
# fmt: on

return workflow

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ install_requires =
numpy
pybids >= 0.11.1
pyyaml
sdcflows >= 2.0.0rc2
sdcflows @ git+https://github.com/nipreps/sdcflows.git@9b5c167d81e7670ef2121151b2d59fe190c0167c
smriprep >= 0.8.0rc0
templateflow ~= 0.6
toml
Expand Down