From e2c68dc3819d95feb5c253089be56c37c21cb9f7 Mon Sep 17 00:00:00 2001 From: Adrien Berchet Date: Wed, 25 Nov 2020 17:39:11 +0100 Subject: [PATCH] Fix parallelization in vacuum synthesis Change-Id: I62eebd9acaffe302a6fe9f5f38978d461dcaf553 --- src/synthesis_workflow/tasks/synthesis.py | 6 ++-- src/synthesis_workflow/vacuum_synthesis.py | 34 ++++++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/synthesis_workflow/tasks/synthesis.py b/src/synthesis_workflow/tasks/synthesis.py index 6023d35..d3798a7 100644 --- a/src/synthesis_workflow/tasks/synthesis.py +++ b/src/synthesis_workflow/tasks/synthesis.py @@ -338,10 +338,8 @@ def run(self): destination=annotations_file, ) else: - annotations_file = ( - Path(PathConfig().local_synthesis_input_path) - / self.annotations_path - ) + input_task_target = yield GetSynthesisInputs() + annotations_file = input_task_target.ppath / self.annotations_path axon_cells = None neurondb_path = find_case_insensitive_file(self.get_neuron_db_path("dat")) diff --git a/src/synthesis_workflow/vacuum_synthesis.py b/src/synthesis_workflow/vacuum_synthesis.py index da2e74a..0197170 100644 --- a/src/synthesis_workflow/vacuum_synthesis.py +++ b/src/synthesis_workflow/vacuum_synthesis.py @@ -1,7 +1,10 @@ """Functions for synthesis to be used by luigi tasks.""" +from functools import partial + import matplotlib.pyplot as plt import numpy as np import pandas as pd +from joblib import cpu_count from joblib import delayed from joblib import Parallel from matplotlib.backends.backend_pdf import PdfPages @@ -52,6 +55,15 @@ def _grow_morphology( return vacuum_synth_morphs_df +def _external_diametrizer(neuron, model, neurite_type, diameter_params=None): + return build_diameters.build( + neuron, + model, + [neurite_type], + diameter_params, + ) + + def grow_vacuum_morphologies( mtypes, n_cells, @@ -75,23 +87,21 @@ def grow_vacuum_morphologies( tmd_distributions["mtypes"][mtype]["diameter"]["method"] = diametrizer if diametrizer == "external": - - def external_diametrizer(neuron, model, neurite_type): - return build_diameters.build( - neuron, - model, - [neurite_type], - tmd_parameters[mtype][ # pylint: disable=cell-var-from-loop - "diameter_params" - ], - ) - + external_diametrizer = partial( + _external_diametrizer, + diameter_params=tmd_parameters[mtype]["diameter_params"], + ) else: external_diametrizer = None gids = range(global_gid, global_gid + n_cells) global_gid += n_cells - for row in Parallel(nb_jobs, verbose=joblib_verbose)( + for row in Parallel( + nb_jobs, + verbose=joblib_verbose, + backend="multiprocessing", + batch_size=1 + int(len(gids) / (cpu_count() if nb_jobs == -1 else nb_jobs)), + )( delayed(_grow_morphology)( gid, mtype,