diff --git a/sdcflows/workflows/base.py b/sdcflows/workflows/base.py index a988a78862..0208c3fdbf 100644 --- a/sdcflows/workflows/base.py +++ b/sdcflows/workflows/base.py @@ -70,15 +70,25 @@ def init_fmap_preproc_wf( from .outputs import init_fmap_derivatives_wf, init_fmap_reports_wf workflow = Workflow(name=name) - for estimator in estimators: + + out_fields = ("fmap", "fmap_ref", "fmap_coeff", "fmap_mask") + out_merge = { + f: pe.Node(niu.Merge(len(estimators)), name=f"out_merge_{f}") + for f in out_fields + } + outputnode = pe.Node(niu.IdentityInterface(fields=out_fields), name="outputnode") + + workflow.connect([ + (mergenode, outputnode, [("out", field)]) + for field, mergenode in out_merge.items() + ]) + + for n, estimator in enumerate(estimators): est_wf = estimator.get_workflow(omp_nthreads=omp_nthreads, debug=debug) source_files = [str(f.path) for f in estimator.sources] - outputnode = pe.Node( - niu.IdentityInterface( - fields=["fmap", "fmap_ref", "fmap_coeff", "fmap_mask"] - ), - name=f"out_{estimator.bids_id}", + out_map = pe.Node( + niu.IdentityInterface(fields=out_fields), name=f"out_{estimator.bids_id}" ) fmap_derivatives_wf = init_fmap_derivatives_wf( @@ -116,7 +126,7 @@ def init_fmap_preproc_wf( (est_wf, fmap_reports_wf, [ ("outputnode.fmap_mask", "inputnode.fmap_mask"), ]), - (est_wf, outputnode, [("outputnode.fmap_mask", "fmap_mask")]), + (est_wf, out_map, [("outputnode.fmap_mask", "fmap_mask")]), ]) # fmt:on @@ -131,11 +141,15 @@ def init_fmap_preproc_wf( ("outputnode.fmap", "inputnode.fieldmap"), ("outputnode.fmap_ref", "inputnode.fmap_ref"), ]), - (est_wf, outputnode, [ + (est_wf, out_map, [ ("outputnode.fmap", "fmap"), ("outputnode.fmap_ref", "fmap_ref"), ("outputnode.fmap_coeff", "fmap_coeff"), ]), ]) # fmt:on + + for field, mergenode in out_merge.items(): + workflow.connect(out_map, field, mergenode, f"in{n}") + return workflow