Skip to content

Commit

Permalink
Fix logger for region-grower
Browse files Browse the repository at this point in the history
The logger are updated according to https://bbpcode.epfl.ch/code/\#/c/50896
Each process creates a log file in which region-grower logs the scale factors
and neuron properties.

The parser is also updated according to these changes in order to plot the
scale statistics.

Change-Id: Iffd1da7838dc04fa6eba36d51a297c071680f9ca
  • Loading branch information
adrien-berchet committed Oct 20, 2020
1 parent be274b3 commit a853dd5
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 55 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ doc/build
build
dist
.eggs

reports
2 changes: 2 additions & 0 deletions requirements-test.pip
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
brainbuilder
pytest
pytest-cov
pytest-html
pytest-mpl
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"atlas_analysis",
"bluepyefe",
"bluepyopt",
"bluepymm",
"diameter_synthesis",
"h5py",
"joblib",
Expand Down Expand Up @@ -45,6 +46,8 @@
"brainbuilder",
"pytest",
"pytest-cov",
"pytest-html",
"pytest-mpl",
]

VERSION = imp.load_source("", "synthesis_workflow/version.py").__version__
Expand Down
19 changes: 17 additions & 2 deletions synthesis_workflow/synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def create_axon_morphologies_tsv(
axon_morphs.to_csv(axon_morphs_path, sep="\t", index=True)


def run_synthesize_morphologies(kwargs, nb_jobs=-1):
def run_synthesize_morphologies(kwargs, nb_jobs=-1, debug_scales=None):
"""Run placement algorithm from python.
Args:
Expand Down Expand Up @@ -301,8 +301,23 @@ def run_synthesize_morphologies(kwargs, nb_jobs=-1):
"seed": 0,
}

# Set logging arguments
logger_kwargs = None
if debug_scales is not None:
logger_kwargs = {
"log_level": logging.DEBUG,
"log_file": debug_scales,
}

# Run
run_master(SynthesizeMorphologiesMaster, kwargs, parser_args, defaults, nb_jobs)
run_master(
SynthesizeMorphologiesMaster,
kwargs,
parser_args,
defaults,
nb_jobs,
logger_kwargs=logger_kwargs,
)


def get_target_length(soma_layer, target_layer, cortical_thicknesses):
Expand Down
2 changes: 1 addition & 1 deletion synthesis_workflow/tasks/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CreateAtlasLayerAnnotations(GlobalParamTask):
def run(self):
""""""
atlas = LocalAtlas(CircuitConfig().atlas_path)
ids, names = atlas.get_layer_ids()
ids, names = atlas.get_layer_ids() # pylint: disable=no-member
annotation = VoxelData.load_nrrd(
Path(CircuitConfig().atlas_path) / "brain_regions.nrrd"
)
Expand Down
15 changes: 4 additions & 11 deletions synthesis_workflow/tasks/synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class Synthesize(GlobalParamTask):
out_circuit_path = luigi.Parameter(default="sliced_circuit_morphologies.mvd3")
axon_morphs_base_dir = luigi.OptionalParameter(default=None)
apical_points_path = luigi.Parameter(default="apical_points.yaml")
debug_region_grower_scales = luigi.BoolParameter(default=False)
debug_region_grower_scales = luigi.OptionalParameter(default=None)

def requires(self):
""""""
Expand Down Expand Up @@ -300,16 +300,9 @@ def run(self):
"seed": str(0),
}

if self.debug_region_grower_scales:
# pylint: disable=protected-access
# from region_grower import context
# context.SpaceNeuronGrower._DEBUG_SCALES = True
# context.SpaceContext._DEBUG_SCALES = True
raise NotImplementedError(
"This feature is not implemented yet in region-grower"
)

run_synthesize_morphologies(kwargs, nb_jobs=self.nb_jobs)
run_synthesize_morphologies(
kwargs, nb_jobs=self.nb_jobs, debug_scales=self.debug_region_grower_scales
)

def output(self):
""""""
Expand Down
18 changes: 11 additions & 7 deletions synthesis_workflow/tasks/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,21 +289,25 @@ class PlotScales(GlobalParamTask):
scales_base_path (str): path to the output folder
log_file (str): log file to parse
mtypes (list(str)): mtypes to plot
apical_target_length_regex (str): regex used to find apical target lengths
neuron_type_position_regex (str): regex used to find neuron type and position
default_scale_regex (str): regex used to find default scales
target_scale_regex (str): regex used to find target scales
neurite_hard_limit_regex (str): regex used to find neurite hard limits
"""

scales_base_path = luigi.Parameter(default="scales")
log_file = luigi.Parameter(default="synthesis_workflow.log")
apical_target_length_regex = luigi.Parameter(
default="Apical target length rescaling: (.*)"
neuron_type_position_regex = luigi.Parameter(
default=r".*\[WORKER TASK ID=([0-9]*)\] Neurite type and position: (.*)"
)
default_scale_regex = luigi.Parameter(
default=r".*\[WORKER TASK ID=([0-9]*)\] Default barcode scale: (.*)"
)
target_scale_regex = luigi.Parameter(
default=r".*\[WORKER TASK ID=([0-9]*)\] Target barcode scale: (.*)"
)
default_scale_regex = luigi.Parameter(default="Default barcode scale: (.*)")
target_scale_regex = luigi.Parameter(default="Target barcode scale: (.*)")
neurite_hard_limit_regex = luigi.Parameter(
default="Neurite hard limit rescaling: (.*)"
default=r".*\[WORKER TASK ID=([0-9]*)\] Neurite hard limit rescaling: (.*)"
)

def requires(self):
Expand All @@ -323,7 +327,7 @@ def run(self):
# Plot statistics
scale_data = parse_log(
self.log_file,
self.apical_target_length_regex,
self.neuron_type_position_regex,
self.default_scale_regex,
self.target_scale_regex,
self.neurite_hard_limit_regex,
Expand Down
108 changes: 93 additions & 15 deletions synthesis_workflow/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
from placement_algorithm.exceptions import SkipSynthesisError
from morph_tool.utils import neurondb_dataframe, find_morph

from synthesis_workflow.utils import setup_logging


def add_taxonomy(morphs_df, pc_in_types):
"""From a dict with mtype to morph_class, fill in the morphs_df dataframe.
Args:
pc_in_types (dict): dict of the form [mtype]: [IN|PC]
"""
for mtype, morph_class in pc_in_types.items():
morphs_df.loc[morphs_df.mtype == mtype, "morph_class"] = morph_class
morphs_df["morph_class"] = morphs_df["mtype"].map(pc_in_types)
return morphs_df


Expand Down Expand Up @@ -122,15 +119,82 @@ def update_morphs_df(morphs_df_path, new_morphs_df):
return pd.read_csv(morphs_df_path).merge(new_morphs_df, how="left")


class IdProcessingFormatter(logging.Formatter):
"""Logging formatter class"""

def __init__(self, fmt=None, datefmt=None, current_id=None):
if fmt is None:
fmt = "%(asctime)s - %(name)s - %(levelname)s -- %(message)s"
if datefmt is None:
datefmt = "%Y/%m/%d %H:%M:%S"

super().__init__(fmt, datefmt)
self.orig_datefmt = datefmt
self.orig_fmt = fmt
if current_id is not None:
self.set_id(current_id)

def set_id(self, new_id):
"""Update current ID to insert in format"""
if new_id is None:
new_fmt = self.orig_fmt
else:
msg_marker = "%(message)s"
parts = self.orig_fmt.split(msg_marker, maxsplit=1)
new_fmt = parts[0] + f"[WORKER TASK ID={new_id}] " + msg_marker + parts[1]
super().__init__(new_fmt, self.orig_datefmt)


class DebugingFileHandler(logging.FileHandler):
"""Logging class that can be retrieved"""


def _wrap_worker(_id, worker, logger_kwargs=None):
"""Wrap the worker job and catch exceptions that must be caught"""
try:
file_handler = None
if logger_kwargs is not None:
setup_logging(**logger_kwargs)

# Search old handlers
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for i in root.handlers:
if isinstance(i, DebugingFileHandler):
file_handler = i
break

# If no DebugingFileHandler was found
if file_handler is None:
# Setup file name
log_file = logger_kwargs.get("log_file", worker.__class__.__name__)
log_file = str(Path(log_file).with_suffix("")) + f"-{_id}.log"
ensure_dir(log_file)

# Setup log formatter
formatter = IdProcessingFormatter(
fmt=logger_kwargs.get("log_format"),
datefmt=logger_kwargs.get("date_format"),
current_id=_id,
)

# Setup handler
file_handler = DebugingFileHandler(log_file)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
root.addHandler(file_handler)
else:
file_handler.formatter.set_id(_id)

with warnings.catch_warnings():
# Ignore all warnings in workers
warnings.simplefilter("ignore")
res = _id, worker(_id)
try:
old_level = root.level
root.setLevel(logging.DEBUG)
res = _id, worker(_id)
finally:
root.setLevel(old_level)

return res
except SkipSynthesisError:
return _id, None
Expand Down Expand Up @@ -186,12 +250,26 @@ def run_master(

L.info("Running %d iterations.", len(master.task_ids))

# Run the worker
random.shuffle(master.task_ids)
results = Parallel(
n_jobs=nb_jobs,
verbose=verbose,
)(delayed(_wrap_worker)(i, worker, logger_kwargs) for i in master.task_ids)

# Gather the results
master.finalize(dict(results))
try:
# Keep current log level to reset afterwards
root = logging.getLogger()
old_level = root.level
if logger_kwargs is not None:
root.setLevel(logging.DEBUG)

# Run the worker
random.shuffle(master.task_ids)
results = Parallel(
n_jobs=nb_jobs,
verbose=verbose,
)(delayed(_wrap_worker)(i, worker, logger_kwargs) for i in master.task_ids)

# Gather the results
master.finalize(dict(results))

finally:
# This is usefull only when using joblib with 1 process
root.setLevel(old_level)
for i in root.handlers:
if isinstance(i, DebugingFileHandler):
root.removeHandler(i)
52 changes: 34 additions & 18 deletions synthesis_workflow/validation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Functions for validation of synthesis to be used by luigi tasks."""
import json
import glob
import logging
import os
import re
Expand Down Expand Up @@ -677,7 +678,7 @@ def plot_path_distance_fits(

def parse_log(
log_file,
apical_target_length_regex,
neuron_type_position_regex,
default_scale_regex,
target_scale_regex,
neurite_hard_limit_regex,
Expand All @@ -689,25 +690,34 @@ def parse_log(
def _search(pattern, line, data):
group = re.search(pattern, line)
if group:
data.append(json.loads(group.group(1)))
groups = group.groups()
new_data = json.loads(groups[1])
new_data["worker_task_id"] = groups[0]
data.append(new_data)

# List log files
log_files = glob.glob(log_file + "*")

# Read log file
with open(log_file) as f:
lines = f.readlines()
all_lines = []
for file in log_files:
with open(file) as f:
lines = f.readlines()
all_lines.extend(lines)

# Get data from log
apical_target_length_data = []
neuron_type_position_data = []
default_scale_data = []
target_scale_data = []
neurite_hard_limit_data = []
for line in lines:
_search(apical_target_length_regex, line, apical_target_length_data)
for line in all_lines:
_search(neuron_type_position_regex, line, neuron_type_position_data)
_search(default_scale_regex, line, default_scale_data)
_search(target_scale_regex, line, target_scale_data)
_search(neurite_hard_limit_regex, line, neurite_hard_limit_data)

# Format data
apical_target_length_df = pd.DataFrame(apical_target_length_data)
neuron_type_position_df = pd.DataFrame(neuron_type_position_data)
default_scale_df = pd.DataFrame(default_scale_data)
target_scale_df = pd.DataFrame(target_scale_data)
neurite_hard_limit_df = pd.DataFrame(neurite_hard_limit_data)
Expand All @@ -720,10 +730,10 @@ def _pos_to_xyz(df, drop=True):
df.drop(columns=["position"], inplace=True)

# Format positions
_pos_to_xyz(apical_target_length_df)
_pos_to_xyz(neuron_type_position_df)

def _compute_min_max_scales(df, key, name_min, name_max):
groups = df.groupby("uuid")
groups = df.groupby("worker_task_id")
neurite_hard_min = groups[key].min().rename(name_min).reset_index()
neurite_hard_max = groups[key].max().rename(name_max).reset_index()
return neurite_hard_min, neurite_hard_max
Expand All @@ -740,24 +750,30 @@ def _compute_min_max_scales(df, key, name_min, name_max):
)

# Merge data
result_data = apical_target_length_df
result_data = neuron_type_position_df
result_data = pd.merge(
result_data, default_min, on="uuid", suffixes=("", "_default_min")
result_data, default_min, on="worker_task_id", suffixes=("", "_default_min")
)
result_data = pd.merge(
result_data, default_max, on="uuid", suffixes=("", "_default_max")
result_data, default_max, on="worker_task_id", suffixes=("", "_default_max")
)
result_data = pd.merge(
result_data, target_min, on="uuid", suffixes=("", "_target_min")
result_data, target_min, on="worker_task_id", suffixes=("", "_target_min")
)
result_data = pd.merge(
result_data, target_max, on="uuid", suffixes=("", "_target_max")
result_data, target_max, on="worker_task_id", suffixes=("", "_target_max")
)
result_data = pd.merge(
result_data, neurite_hard_min, on="uuid", suffixes=("", "_hard_limit_min")
result_data,
neurite_hard_min,
on="worker_task_id",
suffixes=("", "_hard_limit_min"),
)
result_data = pd.merge(
result_data, neurite_hard_max, on="uuid", suffixes=("", "_hard_limit_max")
result_data,
neurite_hard_max,
on="worker_task_id",
suffixes=("", "_hard_limit_max"),
)

return result_data
Expand Down Expand Up @@ -793,7 +809,7 @@ def plot_scale_statistics(mtypes, scale_data, output_dir="scales", dpi=100):
if mtypes is None:
mtypes = scale_data["mtype"].unique().tolist()
for col in scale_data.columns:
if col in ["uuid", "mtype", "x", "y", "z"]:
if col in ["worker_task_id", "mtype", "x", "y", "z"]:
continue
ax = scale_data[["mtype", col]].boxplot(by="mtype")

Expand Down
File renamed without changes.
Loading

0 comments on commit a853dd5

Please sign in to comment.