Skip to content

Commit

Permalink
♻️ Fold initiate_rpool into ResourcePool.__init__
Browse files Browse the repository at this point in the history
  • Loading branch information
shnizzedy committed Jul 10, 2024
1 parent 3334a2a commit bbf3e97
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 274 deletions.
40 changes: 14 additions & 26 deletions CPAC/longitudinal_pipeline/longitudinal_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
build_segmentation_stack,
build_T1w_registration_stack,
connect_pipeline,
initialize_nipype_wf,
)
from CPAC.pipeline.engine import initiate_rpool
from CPAC.pipeline.engine import ResourcePool
from CPAC.pipeline.nodeblock import nodeblock
from CPAC.registration import (
create_fsl_flirt_linear_reg,
Expand Down Expand Up @@ -429,16 +428,13 @@ def anat_longitudinal_wf(subject_id, sub_list, config):
except KeyError:
input_creds_path = None

workflow = initialize_nipype_wf(
config,
session,
# just grab the first one for the name
name="anat_longitudinal_pre-preproc",
rpool = ResourcePool(
cfg=config,
data_paths=session,
pipeline_name="anat_longitudinal_pre-preproc",
)

rpool = initiate_rpool(workflow, config, session)
pipeline_blocks = build_anat_preproc_stack(rpool, config)
workflow = connect_pipeline(workflow, config, rpool, pipeline_blocks)
workflow = connect_pipeline(rpool.wf, config, rpool, pipeline_blocks)

session_wfs[unique_id] = rpool

Expand Down Expand Up @@ -474,13 +470,6 @@ def anat_longitudinal_wf(subject_id, sub_list, config):
)

for strat in strats_brain_dct.keys():
wf = initialize_nipype_wf(
config,
sub_list[0],
# just grab the first one for the name
name=f"template_node_{strat}",
)

config.pipeline_setup["pipeline_name"] = f"longitudinal_{orig_pipe_name}"

template_node_name = f"longitudinal_anat_template_{strat}"
Expand Down Expand Up @@ -508,7 +497,9 @@ def anat_longitudinal_wf(subject_id, sub_list, config):
template_node.inputs.input_skull_list = strats_head_dct[strat]

long_id = f"longitudinal_{subject_id}_strat-{strat}"
rpool = initiate_rpool(wf, config, part_id=long_id)
rpool = ResourcePool(
cfg=config, part_id=long_id, pipeline_name=f"template_node_{strat}"
)
rpool.set_data(
"space-longitudinal_desc-brain_T1w",
template_node,
Expand Down Expand Up @@ -551,7 +542,7 @@ def anat_longitudinal_wf(subject_id, sub_list, config):

pipeline_blocks = build_segmentation_stack(rpool, config, pipeline_blocks)

wf = connect_pipeline(wf, config, rpool, pipeline_blocks)
wf = connect_pipeline(rpool.wf, config, rpool, pipeline_blocks)

excl = [
"space-longitudinal_desc-brain_T1w",
Expand Down Expand Up @@ -586,10 +577,9 @@ def anat_longitudinal_wf(subject_id, sub_list, config):
except KeyError:
session["creds_path"] = None

wf = initialize_nipype_wf(config, session)
rpool = initiate_rpool(wf, config, session)

config.pipeline_setup["pipeline_name"] = f"longitudinal_{orig_pipe_name}"
rpool = ResourcePool(cfg=config, data_paths=session)
wf = rpool.wf
rpool.ingress_output_dir()

select_node_name = f"select_{unique_id}"
Expand Down Expand Up @@ -651,15 +641,13 @@ def anat_longitudinal_wf(subject_id, sub_list, config):
except KeyError:
input_creds_path = None
session["creds_path"] = input_creds_path
wf = initialize_nipype_wf(config, session)
rpool = initiate_rpool(wf, config, session)

rpool = ResourcePool(cfg=config, data_paths=session)
pipeline_blocks = [
warp_longitudinal_T1w_to_template,
warp_longitudinal_seg_to_T1w,
]

wf = connect_pipeline(wf, config, rpool, pipeline_blocks)
wf = connect_pipeline(rpool.wf, config, rpool, pipeline_blocks)

rpool.gather_pipes(wf, config)

Expand Down
31 changes: 3 additions & 28 deletions CPAC/pipeline/cpac_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@
)

# pylint: disable=wrong-import-order
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.pipeline.check_outputs import check_outputs
from CPAC.pipeline.engine import initiate_rpool, NodeBlock
from CPAC.pipeline.engine import NodeBlock, ResourcePool
from CPAC.pipeline.nipype_pipeline_engine.plugins import (
LegacyMultiProcPlugin,
MultiProcPlugin,
Expand Down Expand Up @@ -856,24 +855,6 @@ def remove_workdir(wdpath: str) -> None:
FMLOGGER.warning("Could not remove working directory %s", wdpath)


def initialize_nipype_wf(cfg, sub_data_dct, name=""):
"""Initialize a new nipype workflow."""
if name:
name = f"_{name}"

workflow_name = (
f'cpac{name}_{sub_data_dct["subject_id"]}_{sub_data_dct["unique_id"]}'
)
wf = pe.Workflow(name=workflow_name)
wf.base_dir = cfg.pipeline_setup["working_directory"]["path"]
wf.config["execution"] = {
"hash_method": "timestamp",
"crashdump_dir": os.path.abspath(cfg.pipeline_setup["log_directory"]["path"]),
}

return wf


def load_cpac_pipe_config(pipe_config):
"""Load in pipeline config file."""
config_file = os.path.realpath(pipe_config)
Expand Down Expand Up @@ -1074,7 +1055,6 @@ def build_T1w_registration_stack(rpool, cfg, pipeline_blocks=None):
warp_wholeheadT1_to_template,
warp_T1mask_to_template,
]

if not rpool.check_rpool("desc-restore-brain_T1w"):
reg_blocks.append(correct_restore_brain_intensity_abcd)

Expand Down Expand Up @@ -1176,7 +1156,6 @@ def connect_pipeline(wf, cfg, rpool, pipeline_blocks):
WFLOGGER.info(
"Connecting pipeline blocks:\n%s", list_blocks(pipeline_blocks, indent=1)
)

previous_nb = None
for block in pipeline_blocks:
try:
Expand Down Expand Up @@ -1221,9 +1200,6 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):
"""Build a C-PAC workflow for a single subject."""
from CPAC.utils.datasource import gather_extraction_maps

# Workflow setup
wf = initialize_nipype_wf(cfg, sub_dict, name=pipeline_name)

# Extract credentials path if it exists
try:
creds_path = sub_dict["creds_path"]
Expand All @@ -1247,8 +1223,7 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):
# PREPROCESSING
# """""""""""""""""""""""""""""""""""""""""""""""""""

rpool = initiate_rpool(wf, cfg, sub_dict)

rpool = ResourcePool(cfg=cfg, data_paths=sub_dict, pipeline_name=pipeline_name)
pipeline_blocks = build_anat_preproc_stack(rpool, cfg)

# Anatomical to T1 template registration
Expand Down Expand Up @@ -1615,7 +1590,7 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):

# Connect the entire pipeline!
try:
wf = connect_pipeline(wf, cfg, rpool, pipeline_blocks)
wf = connect_pipeline(rpool.wf, cfg, rpool, pipeline_blocks)
except LookupError as lookup_error:
missing_key = None
errorstrings = [arg for arg in lookup_error.args[0].split("\n") if arg.strip()]
Expand Down
3 changes: 1 addition & 2 deletions CPAC/pipeline/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
run_node_blocks,
wrap_block,
)
from .resource import initiate_rpool, NodeData, ResourcePool
from .resource import NodeData, ResourcePool

__all__ = [
"initiate_rpool",
"NodeBlock",
"NodeData",
"ResourcePool",
Expand Down
4 changes: 2 additions & 2 deletions CPAC/pipeline/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def wrap_block(node_blocks, interface, wf, cfg, strat_pool, pipe_num, opt):

def run_node_blocks(blocks, data_paths, cfg=None):
from CPAC.pipeline.engine import NodeBlock
from CPAC.pipeline.engine.resource import initiate_rpool
from CPAC.pipeline.engine.resource import ResourcePool

if not cfg:
cfg = {
Expand All @@ -540,7 +540,7 @@ def run_node_blocks(blocks, data_paths, cfg=None):
# TODO: WE HAVE TO PARSE OVER UNIQUE ID'S!!!

wf = pe.Workflow(name="node_blocks")
rpool = initiate_rpool(wf, cfg, data_paths)
rpool = ResourcePool(wf=wf, cfg=cfg, data_paths=data_paths)
wf.base_dir = cfg.pipeline_setup["working_directory"]["path"]
wf.config["execution"] = {
"hash_method": "timestamp",
Expand Down
Loading

0 comments on commit bbf3e97

Please sign in to comment.