Skip to content

Commit

Permalink
perf(Ensemble): Re-initialize NeuralNetworks in processes to allow cu…
Browse files Browse the repository at this point in the history
…stom loss functions
  • Loading branch information
muellerdo committed Jun 13, 2022
1 parent d9b7e58 commit 816fea3
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 16 deletions.
53 changes: 49 additions & 4 deletions aucmedi/ensemble/bagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class Bagging:
This can result in redundant image preparation if `prepare_images=True`.
??? warning "NeuralNetwork re-initialization"
The passed NeuralNetwork for the train() and predict() function of the Composite class will be re-initialized!
Attention: Metrics are not passed to the processes due to pickling issues.
??? info "Technical Details"
For the training and inference process, each model will create an individual process via the Python multiprocessing package.
Expand Down Expand Up @@ -166,6 +171,24 @@ def train(self, training_generator, epochs=20, iterations=None,
separator=',', append=True)
callbacks.extend([cb_mc, cb_cl])

# Gather NeuralNetwork parameters
model_paras = {
"n_labels": self.model_template.n_labels,
"channels": self.model_template.channels,
"input_shape": self.model_template.input_shape,
"architecture": self.model_template.architecture,
"pretrained_weights": self.model_template.pretrained_weights,
"loss": self.model_template.loss,
"metrics": None,
"activation_output": self.model_template.activation_output,
"fcl_dropout": self.model_template.fcl_dropout,
"meta_variables": self.model_template.meta_variables,
"learning_rate": self.model_template.learning_rate,
"batch_queue_size": self.model_template.batch_queue_size,
"workers": self.model_template.workers,
"multiprocessing": self.model_template.multiprocessing,
}

# Gather DataGenerator parameters
datagen_paras = {"path_imagedir": temp_dg.path_imagedir,
"batch_size": temp_dg.batch_size,
Expand Down Expand Up @@ -196,7 +219,7 @@ def train(self, training_generator, epochs=20, iterations=None,
process_queue = mp.Queue()
process_train = mp.Process(target=__training_process__,
args=(process_queue,
self.model_template,
model_paras,
data,
datagen_paras,
parameters_training))
Expand Down Expand Up @@ -284,11 +307,29 @@ def predict(self, prediction_generator, aggregate="mean",
path_model = os.path.join(path_model_dir,
"cv_" + str(i) + ".model.hdf5")

# Gather NeuralNetwork parameters
model_paras = {
"n_labels": self.model_template.n_labels,
"channels": self.model_template.channels,
"input_shape": self.model_template.input_shape,
"architecture": self.model_template.architecture,
"pretrained_weights": self.model_template.pretrained_weights,
"loss": self.model_template.loss,
"metrics": None,
"activation_output": self.model_template.activation_output,
"fcl_dropout": self.model_template.fcl_dropout,
"meta_variables": self.model_template.meta_variables,
"learning_rate": self.model_template.learning_rate,
"batch_queue_size": self.model_template.batch_queue_size,
"workers": self.model_template.workers,
"multiprocessing": self.model_template.multiprocessing,
}

# Start inference process for fold i
process_queue = mp.Queue()
process_pred = mp.Process(target=__prediction_process__,
args=(process_queue,
self.model_template,
model_paras,
path_model,
datagen_paras))
process_pred.start()
Expand Down Expand Up @@ -356,7 +397,7 @@ def load(self, directory_path):
# Subroutines #
#-----------------------------------------------------#
# Internal function for training a NeuralNetwork model in a separate process
def __training_process__(queue, model, data, datagen_paras, train_paras):
def __training_process__(queue, model_paras, data, datagen_paras, train_paras):
(train_x, train_y, train_m, test_x, test_y, test_m) = data
# Build training DataGenerator
cv_train_gen = DataGenerator(train_x,
Expand Down Expand Up @@ -396,13 +437,15 @@ def __training_process__(queue, model, data, datagen_paras, train_paras):
loader=datagen_paras["loader"],
workers=datagen_paras["workers"],
**datagen_paras["kwargs"])
# Create NeuralNetwork
model = NeuralNetwork(**model_paras)
# Start NeuralNetwork training
cv_history = model.train(cv_train_gen, cv_val_gen, **train_paras)
# Store result in cache (which will be returned by the process queue)
queue.put(cv_history)

# Internal function for inference with a fitted NeuralNetwork model in a separate process
def __prediction_process__(queue, model, path_model, datagen_paras):
def __prediction_process__(queue, model_paras, path_model, datagen_paras):
# Create inference DataGenerator
cv_pred_gen = DataGenerator(datagen_paras["samples"],
path_imagedir=datagen_paras["path_imagedir"],
Expand All @@ -422,6 +465,8 @@ def __prediction_process__(queue, model, path_model, datagen_paras):
loader=datagen_paras["loader"],
workers=datagen_paras["workers"],
**datagen_paras["kwargs"])
# Create NeuralNetwork
model = NeuralNetwork(**model_paras)
# Load model weights from disk
model.load(path_model)
# Make prediction
Expand Down
76 changes: 70 additions & 6 deletions aucmedi/ensemble/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import numpy as np
import shutil
# Internal libraries
from aucmedi import DataGenerator
from aucmedi import DataGenerator, NeuralNetwork
from aucmedi.sampling import sampling_split, sampling_kfold
from aucmedi.ensemble.aggregate import aggregate_dict
from aucmedi.ensemble.metalearner import metalearner_dict
Expand Down Expand Up @@ -84,6 +84,11 @@ class Composite:
This can result in redundant image preparation if `prepare_images=True`.
??? warning "NeuralNetwork re-initialization"
The passed NeuralNetwork for the train() and predict() function of the Composite class will be re-initialized!
Attention: Metrics are not passed to the processes due to pickling issues.
??? info "Technical Details"
For the training and inference process, each model will create an individual process via the Python multiprocessing package.
Expand Down Expand Up @@ -217,6 +222,24 @@ def train(self, training_generator, epochs=20, iterations=None,
separator=',', append=True)
callbacks.extend([cb_mc, cb_cl])

# Gather NeuralNetwork parameters
model_paras = {
"n_labels": self.model_list[i].n_labels,
"channels": self.model_list[i].channels,
"input_shape": self.model_list[i].input_shape,
"architecture": self.model_list[i].architecture,
"pretrained_weights": self.model_list[i].pretrained_weights,
"loss": self.model_list[i].loss,
"metrics": None,
"activation_output": self.model_list[i].activation_output,
"fcl_dropout": self.model_list[i].fcl_dropout,
"meta_variables": self.model_list[i].meta_variables,
"learning_rate": self.model_list[i].learning_rate,
"batch_queue_size": self.model_list[i].batch_queue_size,
"workers": self.model_list[i].workers,
"multiprocessing": self.model_list[i].multiprocessing,
}

# Gather DataGenerator parameters
datagen_paras = {"path_imagedir": temp_dg.path_imagedir,
"batch_size": temp_dg.batch_size,
Expand All @@ -239,8 +262,8 @@ def train(self, training_generator, epochs=20, iterations=None,
process_queue = mp.Queue()
process_train = mp.Process(target=__training_process__,
args=(process_queue,
self.model_list[i],
data,
model_paras,
datagen_paras,
parameters_training))
process_train.start()
Expand Down Expand Up @@ -302,6 +325,24 @@ def train_metalearner(self, training_generator):
path_model = os.path.join(path_model_dir,
"cv_" + str(i) + ".model.hdf5")

# Gather NeuralNetwork parameters
model_paras = {
"n_labels": self.model_list[i].n_labels,
"channels": self.model_list[i].channels,
"input_shape": self.model_list[i].input_shape,
"architecture": self.model_list[i].architecture,
"pretrained_weights": self.model_list[i].pretrained_weights,
"loss": self.model_list[i].loss,
"metrics": None,
"activation_output": self.model_list[i].activation_output,
"fcl_dropout": self.model_list[i].fcl_dropout,
"meta_variables": self.model_list[i].meta_variables,
"learning_rate": self.model_list[i].learning_rate,
"batch_queue_size": self.model_list[i].batch_queue_size,
"workers": self.model_list[i].workers,
"multiprocessing": self.model_list[i].multiprocessing,
}

# Gather DataGenerator parameters
datagen_paras = {"path_imagedir": temp_dg.path_imagedir,
"batch_size": temp_dg.batch_size,
Expand All @@ -324,7 +365,7 @@ def train_metalearner(self, training_generator):
process_queue = mp.Queue()
process_pred = mp.Process(target=__prediction_process__,
args=(process_queue,
self.model_list[i],
model_paras,
path_model,
data_ensemble,
datagen_paras))
Expand Down Expand Up @@ -398,6 +439,24 @@ def predict(self, prediction_generator, return_ensemble=False):
path_model = os.path.join(path_model_dir,
"cv_" + str(i) + ".model.hdf5")

# Gather NeuralNetwork parameters
model_paras = {
"n_labels": self.model_list[i].n_labels,
"channels": self.model_list[i].channels,
"input_shape": self.model_list[i].input_shape,
"architecture": self.model_list[i].architecture,
"pretrained_weights": self.model_list[i].pretrained_weights,
"loss": self.model_list[i].loss,
"metrics": None,
"activation_output": self.model_list[i].activation_output,
"fcl_dropout": self.model_list[i].fcl_dropout,
"meta_variables": self.model_list[i].meta_variables,
"learning_rate": self.model_list[i].learning_rate,
"batch_queue_size": self.model_list[i].batch_queue_size,
"workers": self.model_list[i].workers,
"multiprocessing": self.model_list[i].multiprocessing,
}

# Gather DataGenerator parameters
datagen_paras = {"path_imagedir": temp_dg.path_imagedir,
"batch_size": temp_dg.batch_size,
Expand All @@ -420,7 +479,7 @@ def predict(self, prediction_generator, return_ensemble=False):
process_queue = mp.Queue()
process_pred = mp.Process(target=__prediction_process__,
args=(process_queue,
self.model_list[i],
model_paras,
path_model,
data_test,
datagen_paras))
Expand Down Expand Up @@ -507,7 +566,7 @@ def load(self, directory_path):
# Subroutines #
#-----------------------------------------------------#
# Internal function for training a NeuralNetwork model in a separate process
def __training_process__(queue, model, data, datagen_paras, train_paras):
def __training_process__(queue, data, model_paras, datagen_paras, train_paras):
# Extract data
(train_x, train_y, train_m, test_x, test_y, test_m) = data
# Build training DataGenerator
Expand Down Expand Up @@ -548,13 +607,16 @@ def __training_process__(queue, model, data, datagen_paras, train_paras):
loader=datagen_paras["loader"],
workers=datagen_paras["workers"],
**datagen_paras["kwargs"])
# Create NeuralNetwork
model = NeuralNetwork(**model_paras)
# Start NeuralNetwork training
cv_history = model.train(cv_train_gen, cv_val_gen, **train_paras)
# Store result in cache (which will be returned by the process queue)
queue.put(cv_history)

# Internal function for inference with a fitted NeuralNetwork model in a separate process
def __prediction_process__(queue, model, path_model, data_test, datagen_paras):
def __prediction_process__(queue, model_paras, path_model, data_test,
datagen_paras):
# Extract data
(test_x, test_y, test_m) = data_test
# Create inference DataGenerator
Expand All @@ -576,6 +638,8 @@ def __prediction_process__(queue, model, path_model, data_test, datagen_paras):
loader=datagen_paras["loader"],
workers=datagen_paras["workers"],
**datagen_paras["kwargs"])
# Create NeuralNetwork
model = NeuralNetwork(**model_paras)
# Load model weights from disk
model.load(path_model)
# Make prediction
Expand Down
Loading

0 comments on commit 816fea3

Please sign in to comment.