Skip to content

Commit

Permalink
✅ Restore engine unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shnizzedy committed Jul 10, 2024
1 parent 7a76603 commit 646f49d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 116 deletions.
194 changes: 86 additions & 108 deletions CPAC/pipeline/test/test_engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
import os
# Copyright (C) 2021-2024 C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Tests for C-PAC pipeline engine."""

from pathlib import Path

import pytest

Expand All @@ -7,148 +25,108 @@
build_workflow,
connect_pipeline,
initialize_nipype_wf,
load_cpac_pipe_config,
)
from CPAC.pipeline.engine import (
ingress_pipeconfig_paths,
ingress_raw_anat_data,
ingress_raw_func_data,
initiate_rpool,
ResourcePool,
)
from CPAC.utils.bids_utils import create_cpac_data_config


@pytest.mark.skip(reason="not a pytest test")
def test_ingress_func_raw_data(pipe_config, bids_dir, test_dir):
sub_data_dct = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = load_cpac_pipe_config(pipe_config)

cfg.pipeline_setup["output_directory"]["path"] = os.path.join(test_dir, "out")
cfg.pipeline_setup["working_directory"]["path"] = os.path.join(test_dir, "work")

from CPAC.utils.configuration import Configuration, Preconfiguration


def _set_up_test(
bids_examples: Path, preconfig: str, tmp_path: Path
) -> tuple[Configuration, dict]:
"""Set up ``cfg`` and ``sub_data`` for engine tests."""
bids_dir = str(bids_examples / "ds051")
sub_data = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = Preconfiguration(preconfig)
cfg.pipeline_setup["output_directory"]["path"] = str(tmp_path / "out")
cfg.pipeline_setup["working_directory"]["path"] = str(tmp_path / "work")
cfg.pipeline_setup["log_directory"]["path"] = str(tmp_path / "logs")
return (cfg, sub_data)


@pytest.mark.parametrize("preconfig", ["default"])
def test_ingress_func_raw_data(
bids_examples: Path, preconfig: str, tmp_path: Path
) -> None:
"""Test :py:method:~`CPAC.pipeline.engine.resource.ResourcePool.ingress_raw_func_data`."""
cfg, sub_data_dct = _set_up_test(bids_examples, preconfig, tmp_path)
wf = initialize_nipype_wf(cfg, sub_data_dct)

part_id = sub_data_dct["subject_id"]
ses_id = sub_data_dct["unique_id"]

unique_id = f"{part_id}_{ses_id}"

rpool = ResourcePool(name=unique_id, cfg=cfg)

rpool = ResourcePool(name=unique_id, cfg=cfg, data_paths=sub_data_dct, wf=wf)
if "func" in sub_data_dct:
wf, rpool, diff, blip, fmap_rp_list = ingress_raw_func_data(
wf, rpool, cfg, sub_data_dct, unique_id, part_id, ses_id
)

rpool.ingress_raw_func_data()
rpool.gather_pipes(wf, cfg, all=True)

wf.run()


@pytest.mark.skip(reason="not a pytest test")
def test_ingress_anat_raw_data(pipe_config, bids_dir, test_dir):
sub_data_dct = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = load_cpac_pipe_config(pipe_config)

cfg.pipeline_setup["output_directory"]["path"] = os.path.join(test_dir, "out")
cfg.pipeline_setup["working_directory"]["path"] = os.path.join(test_dir, "work")

@pytest.mark.parametrize("preconfig", ["default"])
def test_ingress_anat_raw_data(
bids_examples: Path, preconfig: str, tmp_path: Path
) -> None:
"""Test :py:method:~`CPAC.pipeline.engine.resource.ResourcePool.ingress_raw_anat_data`."""
cfg, sub_data_dct = _set_up_test(bids_examples, preconfig, tmp_path)
wf = initialize_nipype_wf(cfg, sub_data_dct)

part_id = sub_data_dct["subject_id"]
ses_id = sub_data_dct["unique_id"]

unique_id = f"{part_id}_{ses_id}"

rpool = ResourcePool(name=unique_id, cfg=cfg)

rpool = ingress_raw_anat_data(
wf, rpool, cfg, sub_data_dct, unique_id, part_id, ses_id
rpool = ResourcePool(
name=unique_id,
cfg=cfg,
data_paths=sub_data_dct,
unique_id=unique_id,
part_id=part_id,
ses_id=ses_id,
wf=wf,
)

rpool.ingress_raw_anat_data()
rpool.gather_pipes(wf, cfg, all=True)

wf.run()


@pytest.mark.skip(reason="not a pytest test")
def test_ingress_pipeconfig_data(pipe_config, bids_dir, test_dir):
sub_data_dct = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = load_cpac_pipe_config(pipe_config)

cfg.pipeline_setup["output_directory"]["path"] = os.path.join(test_dir, "out")
cfg.pipeline_setup["working_directory"]["path"] = os.path.join(test_dir, "work")
cfg.pipeline_setup["log_directory"]["path"] = os.path.join(test_dir, "logs")

@pytest.mark.parametrize("preconfig", ["default"])
def test_ingress_pipeconfig_data(
bids_examples: Path, preconfig: str, tmp_path: Path
) -> None:
"""Test :py:method:~`CPAC.pipeline.engine.resource.ResourcePool.ingress_pipeconfig_paths`."""
cfg, sub_data_dct = _set_up_test(bids_examples, preconfig, tmp_path)
wf = initialize_nipype_wf(cfg, sub_data_dct)

part_id = sub_data_dct["subject_id"]
ses_id = sub_data_dct["unique_id"]

unique_id = f"{part_id}_{ses_id}"

rpool = ResourcePool(name=unique_id, cfg=cfg)

rpool = ingress_pipeconfig_paths(cfg, rpool, sub_data_dct, unique_id)

rpool = ResourcePool(
name=unique_id,
cfg=cfg,
data_paths=sub_data_dct,
part_id=part_id,
ses_id=ses_id,
unique_id=unique_id,
)
rpool.ingress_pipeconfig_paths()
rpool.gather_pipes(wf, cfg, all=True)

wf.run()


@pytest.mark.skip(reason="not a pytest test")
def test_build_anat_preproc_stack(pipe_config, bids_dir, test_dir):
sub_data_dct = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = load_cpac_pipe_config(pipe_config)

cfg.pipeline_setup["output_directory"]["path"] = os.path.join(test_dir, "out")
cfg.pipeline_setup["working_directory"]["path"] = os.path.join(test_dir, "work")
cfg.pipeline_setup["log_directory"]["path"] = os.path.join(test_dir, "logs")
@pytest.mark.parametrize("preconfig", ["anat-only"])
def test_build_anat_preproc_stack(
bids_examples: Path, preconfig: str, tmp_path: Path
) -> None:
"""Test :py:func:~`CPAC.pipeline.cpac_pipeline.build_anat_preproc_stack`."""
cfg, sub_data_dct = _set_up_test(bids_examples, preconfig, tmp_path)

wf = initialize_nipype_wf(cfg, sub_data_dct)

wf, rpool = initiate_rpool(wf, cfg, sub_data_dct)

rpool = initiate_rpool(wf, cfg, sub_data_dct)
pipeline_blocks = build_anat_preproc_stack(rpool, cfg)
wf = connect_pipeline(wf, cfg, rpool, pipeline_blocks)

rpool.gather_pipes(wf, cfg)

wf.run()


@pytest.mark.skip(reason="not a pytest test")
def test_build_workflow(pipe_config, bids_dir, test_dir):
sub_data_dct = create_cpac_data_config(bids_dir, skip_bids_validator=True)[0]
cfg = load_cpac_pipe_config(pipe_config)

cfg.pipeline_setup["output_directory"]["path"] = os.path.join(test_dir, "out")
cfg.pipeline_setup["working_directory"]["path"] = os.path.join(test_dir, "work")
cfg.pipeline_setup["log_directory"]["path"] = os.path.join(test_dir, "logs")

@pytest.mark.parametrize("preconfig", ["default"])
def test_build_workflow(bids_examples: Path, preconfig: str, tmp_path: Path) -> None:
"""Test :py:func:~`CPAC.pipeline.cpac_pipeline.build_workflow`."""
cfg, sub_data_dct = _set_up_test(bids_examples, preconfig, tmp_path)
wf = initialize_nipype_wf(cfg, sub_data_dct)

wf, rpool = initiate_rpool(wf, cfg, sub_data_dct)

wf, _, _ = build_workflow(sub_data_dct["subject_id"], sub_data_dct, cfg)

rpool = initiate_rpool(wf, cfg, sub_data_dct)
wf = build_workflow(sub_data_dct["subject_id"], sub_data_dct, cfg)
rpool.gather_pipes(wf, cfg)

wf.run()


# bids_dir = "/Users/steven.giavasis/data/HBN-SI_dataset/rawdata"
# test_dir = "/test_dir"

# cfg = "/Users/hecheng.jin/GitHub/DevBranch/CPAC/resources/configs/pipeline_config_monkey-ABCD.yml"
cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml"
bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis"
test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc"

# test_ingress_func_raw_data(cfg, bids_dir, test_dir)
# test_ingress_anat_raw_data(cfg, bids_dir, test_dir)
# test_ingress_pipeconfig_data(cfg, bids_dir, test_dir)
# test_build_anat_preproc_stack(cfg, bids_dir, test_dir)
if __name__ == "__main__":
test_build_workflow(cfg, bids_dir, test_dir)
12 changes: 4 additions & 8 deletions CPAC/resources/tests/test_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
import pytest

from CPAC.pipeline import ALL_PIPELINE_CONFIGS
from CPAC.pipeline.engine import ingress_pipeconfig_paths, ResourcePool
from CPAC.pipeline.engine import ResourcePool
from CPAC.utils.configuration import Preconfiguration
from CPAC.utils.datasource import get_highest_local_res


@pytest.mark.parametrize("pipeline", ALL_PIPELINE_CONFIGS)
def test_packaged_path_exists(pipeline):
"""
Check that all local templates are included in image at at
least one resolution.
"""
rpool = ingress_pipeconfig_paths(
Preconfiguration(pipeline), ResourcePool(), "pytest"
)
"""Check that all local templates are included in at least one resolution."""
rpool = ResourcePool(cfg=Preconfiguration(pipeline), unique_id="pytest")
rpool.ingress_pipeconfig_paths()
for resource in rpool.rpool.values():
node = next(iter(resource.values())).get("data")[0]
if hasattr(node.inputs, "template") and not node.inputs.template.startswith(
Expand Down

0 comments on commit 646f49d

Please sign in to comment.