diff --git a/synthesis_workflow/tasks/circuit.py b/synthesis_workflow/tasks/circuit.py index 990654c..3e812b9 100644 --- a/synthesis_workflow/tasks/circuit.py +++ b/synthesis_workflow/tasks/circuit.py @@ -19,11 +19,11 @@ from .config import CircuitConfig from .config import SynthesisConfig from .luigi_tools import copy_params -from .luigi_tools import GlobalParamTask from .luigi_tools import ParamLink +from .luigi_tools import WorkflowTask -class CreateAtlasLayerAnnotations(GlobalParamTask): +class CreateAtlasLayerAnnotations(WorkflowTask): """Create the annotation file for layers from an atlas. Args: @@ -72,7 +72,7 @@ def output(self): return luigi.LocalTarget(self.layer_annotations_path) -class CreateAtlasPlanes(GlobalParamTask): +class CreateAtlasPlanes(WorkflowTask): """Create plane cuts of an atlas. Args: @@ -129,7 +129,7 @@ def output(self): @copy_params( mtypes=ParamLink(SynthesisConfig), ) -class SliceCircuit(GlobalParamTask): +class SliceCircuit(WorkflowTask): """Create a smaller circuit .mvd3 file for subsampling. Args: diff --git a/synthesis_workflow/tasks/diametrizer.py b/synthesis_workflow/tasks/diametrizer.py index 244e8a1..d3f8b1b 100644 --- a/synthesis_workflow/tasks/diametrizer.py +++ b/synthesis_workflow/tasks/diametrizer.py @@ -23,8 +23,8 @@ from .config import DiametrizerConfig from .config import RunnerConfig from .luigi_tools import copy_params -from .luigi_tools import GlobalParamTask from .luigi_tools import ParamLink +from .luigi_tools import WorkflowTask matplotlib.use("Agg") @@ -67,7 +67,7 @@ def _plot_models(models_params, models_data, fig_folder="figures", ext=".png"): @copy_params( nb_jobs=ParamLink(RunnerConfig), ) -class BuildDiameterModels(GlobalParamTask): +class BuildDiameterModels(WorkflowTask): """Task to build diameter models from set of cells.""" morphs_df_path = luigi.Parameter(default="morphs_df.csv") @@ -144,7 +144,7 @@ def _diametrizer( return gid, new_path, exception -class Diametrize(luigi.Task): +class Diametrize(WorkflowTask): """Task to build diameter models from set of cells.""" morphs_df_path = luigi.Parameter(default="morphs_df.csv") diff --git a/synthesis_workflow/tasks/synthesis.py b/synthesis_workflow/tasks/synthesis.py index 189fbb4..9b0bf72 100644 --- a/synthesis_workflow/tasks/synthesis.py +++ b/synthesis_workflow/tasks/synthesis.py @@ -28,8 +28,8 @@ from .config import RunnerConfig from .config import SynthesisConfig from .luigi_tools import copy_params -from .luigi_tools import GlobalParamTask from .luigi_tools import ParamLink +from .luigi_tools import WorkflowTask morphio.set_maximum_warnings(0) @@ -37,7 +37,7 @@ L = logging.getLogger(__name__) -class ApplySubstitutionRules(luigi.Task): +class ApplySubstitutionRules(WorkflowTask): """Apply substitution rules to the morphology dataframe. Args: @@ -65,7 +65,7 @@ def output(self): @copy_params( tmd_parameters_path=ParamLink(SynthesisConfig), ) -class BuildSynthesisParameters(GlobalParamTask): +class BuildSynthesisParameters(WorkflowTask): """Build the tmd_parameter.json for synthesis. Args: @@ -119,7 +119,7 @@ def output(self): @copy_params( morphology_path=ParamLink(PathConfig), ) -class BuildSynthesisDistributions(GlobalParamTask): +class BuildSynthesisDistributions(WorkflowTask): """Build the tmd_distribution.json for synthesis. Args: @@ -168,7 +168,7 @@ def requires(self): @copy_params( nb_jobs=ParamLink(RunnerConfig), ) -class BuildAxonMorphologies(GlobalParamTask): +class BuildAxonMorphologies(WorkflowTask): """Run choose-morphologies to synthesize axon morphologies. Args: @@ -232,7 +232,7 @@ def output(self): morphology_path=ParamLink(PathConfig), nb_jobs=ParamLink(RunnerConfig), ) -class Synthesize(GlobalParamTask): +class Synthesize(WorkflowTask): """Run placement-algorithm to synthesize morphologies. Args: @@ -265,9 +265,11 @@ def run(self): """""" axon_morphs_path = self.input()["axons"].path + out_mvd3 = self.output()["out_mvd3"] + debug_scales = self.output()["debug_scales"] ensure_dir(axon_morphs_path) - ensure_dir(self.output().path) + ensure_dir(out_mvd3.path) ensure_dir(self.apical_points_path) ensure_dir(PathConfig().synth_output_path) @@ -288,7 +290,7 @@ def run(self): "tmd_parameters": self.input()["tmd_parameters"].path, "tmd_distributions": self.input()["tmd_distributions"].path, "atlas": CircuitConfig().atlas_path, - "out_mvd3": self.output().path, + "out_mvd3": out_mvd3.path, "out_apical": self.apical_points_path, "out_morph_ext": [str(self.ext)], "out_morph_dir": PathConfig().synth_output_path, @@ -301,12 +303,16 @@ def run(self): } run_synthesize_morphologies( - kwargs, nb_jobs=self.nb_jobs, debug_scales=self.debug_region_grower_scales + kwargs, nb_jobs=self.nb_jobs, debug_scales=debug_scales.path ) def output(self): """""" - return luigi.LocalTarget(self.out_circuit_path) + return { + "out_mvd3": luigi.LocalTarget(self.out_circuit_path), + "out_morphologies": luigi.LocalTarget(PathConfig().synth_output_path), + "debug_scales": luigi.LocalTarget(self.debug_region_grower_scales), + } @copy_params( @@ -314,7 +320,7 @@ def output(self): tmd_parameters_path=ParamLink(SynthesisConfig), nb_jobs=ParamLink(RunnerConfig), ) -class AddScalingRulesToParameters(GlobalParamTask): +class AddScalingRulesToParameters(WorkflowTask): """Add scaling rules to tmd_parameter.json.""" scaling_rules_path = luigi.Parameter(default="scaling_rules.yaml") @@ -356,7 +362,7 @@ def output(self): morphology_path=ParamLink(PathConfig), nb_jobs=ParamLink(RunnerConfig), ) -class RescaleMorphologies(GlobalParamTask): +class RescaleMorphologies(WorkflowTask): """Rescale morphologies for synthesis input.""" rescaled_morphology_path = luigi.Parameter(default="rescaled_morphology_path") diff --git a/synthesis_workflow/tasks/vacuum_synthesis.py b/synthesis_workflow/tasks/vacuum_synthesis.py index 5e5f90d..e15b29d 100644 --- a/synthesis_workflow/tasks/vacuum_synthesis.py +++ b/synthesis_workflow/tasks/vacuum_synthesis.py @@ -14,8 +14,8 @@ from .config import RunnerConfig from .config import SynthesisConfig from .luigi_tools import copy_params -from .luigi_tools import GlobalParamTask from .luigi_tools import ParamLink +from .luigi_tools import WorkflowTask from .synthesis import BuildSynthesisDistributions from .synthesis import BuildSynthesisParameters @@ -30,7 +30,7 @@ nb_jobs=ParamLink(RunnerConfig), joblib_verbose=ParamLink(RunnerConfig), ) -class VacuumSynthesize(GlobalParamTask): +class VacuumSynthesize(WorkflowTask): """Grow cells in vacuum, for annotation tasks.""" vacuum_synth_morphology_path = luigi.Parameter(default="vacuum_synth_morphologies") @@ -58,8 +58,8 @@ def run(self): else: mtypes = self.mtypes - Path(self.vacuum_synth_morphology_path).mkdir(parents=True, exist_ok=True) - morphology_base_path = Path(self.vacuum_synth_morphology_path).absolute() + Path(self.output()["out_morphologies"].path).mkdir(parents=True, exist_ok=True) + morphology_base_path = Path(self.output()["out_morphologies"].path).absolute() vacuum_synth_morphs_df = grow_vacuum_morphologies( mtypes, self.n_cells, @@ -69,17 +69,20 @@ def run(self): joblib_verbose=self.joblib_verbose, nb_jobs=self.nb_jobs, ) - vacuum_synth_morphs_df.to_csv(self.output().path, index=False) + vacuum_synth_morphs_df.to_csv(self.output()["out_morphs_df"].path, index=False) def output(self): """""" - return luigi.LocalTarget(self.vacuum_synth_morphs_df_path) + return { + "out_morphs_df": luigi.LocalTarget(self.vacuum_synth_morphs_df_path), + "out_morphologies": luigi.LocalTarget(self.vacuum_synth_morphology_path), + } @copy_params( morphology_path=ParamLink(PathConfig, default="vacuum_morphology_path"), ) -class PlotVacuumMorphologies(GlobalParamTask): +class PlotVacuumMorphologies(WorkflowTask): """Plot morphologies to obtain annotations.""" pdf_filename = luigi.Parameter(default="vacuum_morphologies.pdf") @@ -90,7 +93,7 @@ def requires(self): def run(self): """""" - vacuum_synth_morphs_df = pd.read_csv(self.input().path) + vacuum_synth_morphs_df = pd.read_csv(self.input()["out_morphs_df"].path) ensure_dir(self.output().path) plot_vacuum_morphologies( vacuum_synth_morphs_df, diff --git a/synthesis_workflow/tasks/validation.py b/synthesis_workflow/tasks/validation.py index 0ebc367..9aa5e31 100644 --- a/synthesis_workflow/tasks/validation.py +++ b/synthesis_workflow/tasks/validation.py @@ -22,8 +22,8 @@ from .config import RunnerConfig from .config import SynthesisConfig from .luigi_tools import copy_params -from .luigi_tools import GlobalParamTask from .luigi_tools import ParamLink +from .luigi_tools import WorkflowTask from .synthesis import AddScalingRulesToParameters from .synthesis import BuildSynthesisDistributions from .synthesis import ApplySubstitutionRules @@ -37,7 +37,7 @@ @copy_params( ext=ParamLink(PathConfig), ) -class ConvertMvd3(GlobalParamTask): +class ConvertMvd3(WorkflowTask): """Convert synthesize mvd3 file to morphs_df.csv file. Args: @@ -51,7 +51,7 @@ def requires(self): def run(self): """""" synth_morphs_df = convert_mvd3_to_morphs_df( - self.input().path, PathConfig().synth_output_path, self.ext + self.input()["out_mvd3"].path, PathConfig().synth_output_path, self.ext ) synth_morphs_df.to_csv(self.output().path, index=False) @@ -61,7 +61,7 @@ def output(self): return luigi.LocalTarget(PathConfig().synth_morphs_df_path) -class PlotMorphometrics(luigi.Task): +class PlotMorphometrics(WorkflowTask): """Plot morphometric.""" morph_type = luigi.ChoiceParameter( @@ -128,7 +128,7 @@ def output(self): @copy_params( nb_jobs=ParamLink(RunnerConfig), ) -class PlotDensityProfiles(GlobalParamTask): +class PlotDensityProfiles(WorkflowTask): """Plot density profiles of neurites in an atlas. Args: @@ -151,7 +151,7 @@ def run(self): """""" circuit = load_circuit( - path_to_mvd3=self.input().path, + path_to_mvd3=self.input()["out_mvd3"].path, path_to_morphologies=PathConfig().synth_output_path, path_to_atlas=CircuitConfig().atlas_path, ) @@ -175,7 +175,7 @@ def output(self): nb_jobs=ParamLink(RunnerConfig), joblib_verbose=ParamLink(RunnerConfig), ) -class PlotCollage(GlobalParamTask): +class PlotCollage(WorkflowTask): """Plot collage. Args: @@ -225,7 +225,7 @@ def output(self): nb_jobs=ParamLink(RunnerConfig), joblib_verbose=ParamLink(RunnerConfig), ) -class PlotSingleCollage(GlobalParamTask): +class PlotSingleCollage(WorkflowTask): """Plot collage for single mtype. Args: @@ -253,7 +253,7 @@ def requires(self): def run(self): """""" circuit = load_circuit( - path_to_mvd3=self.input()["synthesis"].path, + path_to_mvd3=self.input()["synthesis"]["out_mvd3"].path, path_to_morphologies=PathConfig().synth_output_path, path_to_atlas=CircuitConfig().atlas_path, ) @@ -282,7 +282,7 @@ def output(self): @copy_params( mtypes=ParamLink(SynthesisConfig), ) -class PlotScales(GlobalParamTask): +class PlotScales(WorkflowTask): """Plot scales. Args: @@ -348,7 +348,7 @@ def output(self): morphology_path=ParamLink(PathConfig), nb_jobs=ParamLink(RunnerConfig), ) -class PlotPathDistanceFits(GlobalParamTask): +class PlotPathDistanceFits(WorkflowTask): """Plot collage for single mtype. Args: diff --git a/synthesis_workflow/tools.py b/synthesis_workflow/tools.py index c73d346..1963c2e 100644 --- a/synthesis_workflow/tools.py +++ b/synthesis_workflow/tools.py @@ -167,7 +167,7 @@ def _wrap_worker(_id, worker, logger_kwargs=None): if file_handler is None: # Setup file name log_file = logger_kwargs.get("log_file", worker.__class__.__name__) - log_file = str(Path(log_file).with_suffix("")) + f"-{_id}.log" + log_file = str(Path(log_file).with_suffix("") / f"scale-{_id}.log") ensure_dir(log_file) # Setup log formatter