Skip to content

Commit

Permalink
Transform all luigi.tasks into WorkflowTask and improve outputs
Browse files Browse the repository at this point in the history
The rerun feature only works if the entire tree is made of WorkflowTask,
so all tasks are now WorkflowTask.

The task outputs were not always complete, so the rerun feature could not
remove all the files since it didn't know all of them.

Change-Id: Iff243c610602811af528744cfc4e3739bfcb9583
  • Loading branch information
adrien-berchet committed Oct 20, 2020
1 parent c6d1286 commit db706cc
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 39 deletions.
8 changes: 4 additions & 4 deletions synthesis_workflow/tasks/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from .config import CircuitConfig
from .config import SynthesisConfig
from .luigi_tools import copy_params
from .luigi_tools import GlobalParamTask
from .luigi_tools import ParamLink
from .luigi_tools import WorkflowTask


class CreateAtlasLayerAnnotations(GlobalParamTask):
class CreateAtlasLayerAnnotations(WorkflowTask):
"""Create the annotation file for layers from an atlas.
Args:
Expand Down Expand Up @@ -72,7 +72,7 @@ def output(self):
return luigi.LocalTarget(self.layer_annotations_path)


class CreateAtlasPlanes(GlobalParamTask):
class CreateAtlasPlanes(WorkflowTask):
"""Create plane cuts of an atlas.
Args:
Expand Down Expand Up @@ -129,7 +129,7 @@ def output(self):
@copy_params(
mtypes=ParamLink(SynthesisConfig),
)
class SliceCircuit(GlobalParamTask):
class SliceCircuit(WorkflowTask):
"""Create a smaller circuit .mvd3 file for subsampling.
Args:
Expand Down
6 changes: 3 additions & 3 deletions synthesis_workflow/tasks/diametrizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from .config import DiametrizerConfig
from .config import RunnerConfig
from .luigi_tools import copy_params
from .luigi_tools import GlobalParamTask
from .luigi_tools import ParamLink
from .luigi_tools import WorkflowTask


matplotlib.use("Agg")
Expand Down Expand Up @@ -67,7 +67,7 @@ def _plot_models(models_params, models_data, fig_folder="figures", ext=".png"):
@copy_params(
nb_jobs=ParamLink(RunnerConfig),
)
class BuildDiameterModels(GlobalParamTask):
class BuildDiameterModels(WorkflowTask):
"""Task to build diameter models from set of cells."""

morphs_df_path = luigi.Parameter(default="morphs_df.csv")
Expand Down Expand Up @@ -144,7 +144,7 @@ def _diametrizer(
return gid, new_path, exception


class Diametrize(luigi.Task):
class Diametrize(WorkflowTask):
"""Task to build diameter models from set of cells."""

morphs_df_path = luigi.Parameter(default="morphs_df.csv")
Expand Down
30 changes: 18 additions & 12 deletions synthesis_workflow/tasks/synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
from .config import RunnerConfig
from .config import SynthesisConfig
from .luigi_tools import copy_params
from .luigi_tools import GlobalParamTask
from .luigi_tools import ParamLink
from .luigi_tools import WorkflowTask


morphio.set_maximum_warnings(0)

L = logging.getLogger(__name__)


class ApplySubstitutionRules(luigi.Task):
class ApplySubstitutionRules(WorkflowTask):
"""Apply substitution rules to the morphology dataframe.
Args:
Expand Down Expand Up @@ -65,7 +65,7 @@ def output(self):
@copy_params(
tmd_parameters_path=ParamLink(SynthesisConfig),
)
class BuildSynthesisParameters(GlobalParamTask):
class BuildSynthesisParameters(WorkflowTask):
"""Build the tmd_parameter.json for synthesis.
Args:
Expand Down Expand Up @@ -119,7 +119,7 @@ def output(self):
@copy_params(
morphology_path=ParamLink(PathConfig),
)
class BuildSynthesisDistributions(GlobalParamTask):
class BuildSynthesisDistributions(WorkflowTask):
"""Build the tmd_distribution.json for synthesis.
Args:
Expand Down Expand Up @@ -168,7 +168,7 @@ def requires(self):
@copy_params(
nb_jobs=ParamLink(RunnerConfig),
)
class BuildAxonMorphologies(GlobalParamTask):
class BuildAxonMorphologies(WorkflowTask):
"""Run choose-morphologies to synthesize axon morphologies.
Args:
Expand Down Expand Up @@ -232,7 +232,7 @@ def output(self):
morphology_path=ParamLink(PathConfig),
nb_jobs=ParamLink(RunnerConfig),
)
class Synthesize(GlobalParamTask):
class Synthesize(WorkflowTask):
"""Run placement-algorithm to synthesize morphologies.
Args:
Expand Down Expand Up @@ -265,9 +265,11 @@ def run(self):
""""""

axon_morphs_path = self.input()["axons"].path
out_mvd3 = self.output()["out_mvd3"]
debug_scales = self.output()["debug_scales"]

ensure_dir(axon_morphs_path)
ensure_dir(self.output().path)
ensure_dir(out_mvd3.path)
ensure_dir(self.apical_points_path)
ensure_dir(PathConfig().synth_output_path)

Expand All @@ -288,7 +290,7 @@ def run(self):
"tmd_parameters": self.input()["tmd_parameters"].path,
"tmd_distributions": self.input()["tmd_distributions"].path,
"atlas": CircuitConfig().atlas_path,
"out_mvd3": self.output().path,
"out_mvd3": out_mvd3.path,
"out_apical": self.apical_points_path,
"out_morph_ext": [str(self.ext)],
"out_morph_dir": PathConfig().synth_output_path,
Expand All @@ -301,20 +303,24 @@ def run(self):
}

run_synthesize_morphologies(
kwargs, nb_jobs=self.nb_jobs, debug_scales=self.debug_region_grower_scales
kwargs, nb_jobs=self.nb_jobs, debug_scales=debug_scales.path
)

def output(self):
""""""
return luigi.LocalTarget(self.out_circuit_path)
return {
"out_mvd3": luigi.LocalTarget(self.out_circuit_path),
"out_morphologies": luigi.LocalTarget(PathConfig().synth_output_path),
"debug_scales": luigi.LocalTarget(self.debug_region_grower_scales),
}


@copy_params(
morphology_path=ParamLink(PathConfig),
tmd_parameters_path=ParamLink(SynthesisConfig),
nb_jobs=ParamLink(RunnerConfig),
)
class AddScalingRulesToParameters(GlobalParamTask):
class AddScalingRulesToParameters(WorkflowTask):
"""Add scaling rules to tmd_parameter.json."""

scaling_rules_path = luigi.Parameter(default="scaling_rules.yaml")
Expand Down Expand Up @@ -356,7 +362,7 @@ def output(self):
morphology_path=ParamLink(PathConfig),
nb_jobs=ParamLink(RunnerConfig),
)
class RescaleMorphologies(GlobalParamTask):
class RescaleMorphologies(WorkflowTask):
"""Rescale morphologies for synthesis input."""

rescaled_morphology_path = luigi.Parameter(default="rescaled_morphology_path")
Expand Down
19 changes: 11 additions & 8 deletions synthesis_workflow/tasks/vacuum_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from .config import RunnerConfig
from .config import SynthesisConfig
from .luigi_tools import copy_params
from .luigi_tools import GlobalParamTask
from .luigi_tools import ParamLink
from .luigi_tools import WorkflowTask
from .synthesis import BuildSynthesisDistributions
from .synthesis import BuildSynthesisParameters

Expand All @@ -30,7 +30,7 @@
nb_jobs=ParamLink(RunnerConfig),
joblib_verbose=ParamLink(RunnerConfig),
)
class VacuumSynthesize(GlobalParamTask):
class VacuumSynthesize(WorkflowTask):
"""Grow cells in vacuum, for annotation tasks."""

vacuum_synth_morphology_path = luigi.Parameter(default="vacuum_synth_morphologies")
Expand Down Expand Up @@ -58,8 +58,8 @@ def run(self):
else:
mtypes = self.mtypes

Path(self.vacuum_synth_morphology_path).mkdir(parents=True, exist_ok=True)
morphology_base_path = Path(self.vacuum_synth_morphology_path).absolute()
Path(self.output()["out_morphologies"].path).mkdir(parents=True, exist_ok=True)
morphology_base_path = Path(self.output()["out_morphologies"].path).absolute()
vacuum_synth_morphs_df = grow_vacuum_morphologies(
mtypes,
self.n_cells,
Expand All @@ -69,17 +69,20 @@ def run(self):
joblib_verbose=self.joblib_verbose,
nb_jobs=self.nb_jobs,
)
vacuum_synth_morphs_df.to_csv(self.output().path, index=False)
vacuum_synth_morphs_df.to_csv(self.output()["out_morphs_df"].path, index=False)

def output(self):
""""""
return luigi.LocalTarget(self.vacuum_synth_morphs_df_path)
return {
"out_morphs_df": luigi.LocalTarget(self.vacuum_synth_morphs_df_path),
"out_morphologies": luigi.LocalTarget(self.vacuum_synth_morphology_path),
}


@copy_params(
morphology_path=ParamLink(PathConfig, default="vacuum_morphology_path"),
)
class PlotVacuumMorphologies(GlobalParamTask):
class PlotVacuumMorphologies(WorkflowTask):
"""Plot morphologies to obtain annotations."""

pdf_filename = luigi.Parameter(default="vacuum_morphologies.pdf")
Expand All @@ -90,7 +93,7 @@ def requires(self):

def run(self):
""""""
vacuum_synth_morphs_df = pd.read_csv(self.input().path)
vacuum_synth_morphs_df = pd.read_csv(self.input()["out_morphs_df"].path)
ensure_dir(self.output().path)
plot_vacuum_morphologies(
vacuum_synth_morphs_df,
Expand Down
22 changes: 11 additions & 11 deletions synthesis_workflow/tasks/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from .config import RunnerConfig
from .config import SynthesisConfig
from .luigi_tools import copy_params
from .luigi_tools import GlobalParamTask
from .luigi_tools import ParamLink
from .luigi_tools import WorkflowTask
from .synthesis import AddScalingRulesToParameters
from .synthesis import BuildSynthesisDistributions
from .synthesis import ApplySubstitutionRules
Expand All @@ -37,7 +37,7 @@
@copy_params(
ext=ParamLink(PathConfig),
)
class ConvertMvd3(GlobalParamTask):
class ConvertMvd3(WorkflowTask):
"""Convert synthesize mvd3 file to morphs_df.csv file.
Args:
Expand All @@ -51,7 +51,7 @@ def requires(self):
def run(self):
""""""
synth_morphs_df = convert_mvd3_to_morphs_df(
self.input().path, PathConfig().synth_output_path, self.ext
self.input()["out_mvd3"].path, PathConfig().synth_output_path, self.ext
)

synth_morphs_df.to_csv(self.output().path, index=False)
Expand All @@ -61,7 +61,7 @@ def output(self):
return luigi.LocalTarget(PathConfig().synth_morphs_df_path)


class PlotMorphometrics(luigi.Task):
class PlotMorphometrics(WorkflowTask):
"""Plot morphometric."""

morph_type = luigi.ChoiceParameter(
Expand Down Expand Up @@ -128,7 +128,7 @@ def output(self):
@copy_params(
nb_jobs=ParamLink(RunnerConfig),
)
class PlotDensityProfiles(GlobalParamTask):
class PlotDensityProfiles(WorkflowTask):
"""Plot density profiles of neurites in an atlas.
Args:
Expand All @@ -151,7 +151,7 @@ def run(self):
""""""

circuit = load_circuit(
path_to_mvd3=self.input().path,
path_to_mvd3=self.input()["out_mvd3"].path,
path_to_morphologies=PathConfig().synth_output_path,
path_to_atlas=CircuitConfig().atlas_path,
)
Expand All @@ -175,7 +175,7 @@ def output(self):
nb_jobs=ParamLink(RunnerConfig),
joblib_verbose=ParamLink(RunnerConfig),
)
class PlotCollage(GlobalParamTask):
class PlotCollage(WorkflowTask):
"""Plot collage.
Args:
Expand Down Expand Up @@ -225,7 +225,7 @@ def output(self):
nb_jobs=ParamLink(RunnerConfig),
joblib_verbose=ParamLink(RunnerConfig),
)
class PlotSingleCollage(GlobalParamTask):
class PlotSingleCollage(WorkflowTask):
"""Plot collage for single mtype.
Args:
Expand Down Expand Up @@ -253,7 +253,7 @@ def requires(self):
def run(self):
""""""
circuit = load_circuit(
path_to_mvd3=self.input()["synthesis"].path,
path_to_mvd3=self.input()["synthesis"]["out_mvd3"].path,
path_to_morphologies=PathConfig().synth_output_path,
path_to_atlas=CircuitConfig().atlas_path,
)
Expand Down Expand Up @@ -282,7 +282,7 @@ def output(self):
@copy_params(
mtypes=ParamLink(SynthesisConfig),
)
class PlotScales(GlobalParamTask):
class PlotScales(WorkflowTask):
"""Plot scales.
Args:
Expand Down Expand Up @@ -348,7 +348,7 @@ def output(self):
morphology_path=ParamLink(PathConfig),
nb_jobs=ParamLink(RunnerConfig),
)
class PlotPathDistanceFits(GlobalParamTask):
class PlotPathDistanceFits(WorkflowTask):
"""Plot collage for single mtype.
Args:
Expand Down
2 changes: 1 addition & 1 deletion synthesis_workflow/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _wrap_worker(_id, worker, logger_kwargs=None):
if file_handler is None:
# Setup file name
log_file = logger_kwargs.get("log_file", worker.__class__.__name__)
log_file = str(Path(log_file).with_suffix("")) + f"-{_id}.log"
log_file = str(Path(log_file).with_suffix("") / f"scale-{_id}.log")
ensure_dir(log_file)

# Setup log formatter
Expand Down

0 comments on commit db706cc

Please sign in to comment.