Skip to content

Commit

Permalink
#159 Existing work on multiprocessing.
Browse files Browse the repository at this point in the history
  • Loading branch information
james-delisle committed Jul 20, 2021
1 parent 8c8dfbc commit d84314e
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 175 deletions.
1 change: 0 additions & 1 deletion ridt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
bar_format = '{l_bar}{bar:30}{r_bar}{bar:-10b}'
bar_args = {
"position": 0,
"leave": True,
"bar_format": bar_format
}
62 changes: 57 additions & 5 deletions ridt/container/eddydiffusionrun.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from multiprocessing import Manager
from multiprocessing import Pool
from multiprocessing import current_process
from multiprocessing.managers import DictProxy

from tqdm import tqdm

from miniutils import parallel_progbar

from typing import List

from numpy import squeeze

from ridt import bar_args
from ridt.base import ComputationalSpace

from ridt.config import RIDTConfig
Expand All @@ -16,6 +28,37 @@
from ridt.analysis import Exposure


def run(setting: RIDTConfig, data_store: DictProxy, geometries: List[str]) -> None:
"""Evaluates the model for a set of parameters, for all geometries.
Loops over all monitor locations that have been selected for evaluation
and evaluates them over their respective domains. Writes output to
:attr:`data_store`.
Parameters
----------
settings : :class:`~.RIDTConfig`
The settings for the run in question.
Returns
-------
None
"""

domain = Domain(setting)
solver = EddyDiffusion(setting)
locations = setting.models.eddy_diffusion.monitor_locations

for geometry in geometries:
# print(f"Evaluating {geometry} monitor locations...")
for name, item in getattr(locations, geometry).items():
# print(f"Evaluating {name}...")
space = getattr(domain, geometry)(item)
output = solver(*getattr(domain, geometry)(item), domain.time)
data_store[setting].add(geometry, name, squeeze(output))


class EddyDiffusionRun:
"""The class which orchestrates an Eddy Diffusion model run.
Expand Down Expand Up @@ -52,7 +95,8 @@ def __init__(self, settings: RIDTConfig, outdir: str):
"""
self.settings = settings
self.outdir = outdir
self.data_store = BatchDataStore()
self.manager = Manager()
self.data_store = BatchDataStore(self.manager)
self.exposure_store = None
self.space = self.prepare()
self.evaluate()
Expand Down Expand Up @@ -93,10 +137,18 @@ def evaluate(self) -> None:
None
"""
for setting in self.space.space:
count = f"{self.space.linear_index(setting) + 1}/{len(self.space)}"
print(f"Evaluating computational space element {count}")
self.run(setting)
pool = Pool(2)
inp = []
for item in self.space.space:
self.data_store.add_run(item)
inp.append((item, self.data_store.store, self.geometries))

parallel_progbar(run, inp, nprocs=6, starmap=True, **bar_args)
pool.close()
# for setting in self.space.space:
# count = f"{self.space.linear_index(setting) + 1}/{len(self.space)}"
# print(f"Evaluating computational space element {count}")
# self.run(setting)

def run(self, setting: RIDTConfig) -> None:
"""Evaluates the model for a set of parameters, for all geometries.
Expand Down
53 changes: 45 additions & 8 deletions ridt/container/wellmixedrun.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from multiprocessing import Manager
from multiprocessing import Pool
from multiprocessing.managers import DictProxy

from ridt.base import ComputationalSpace
from ridt.equation import WellMixed

Expand All @@ -15,6 +19,27 @@
BF = '{l_bar}{bar:30}{r_bar}{bar:-10b}'


def run(setting: RIDTConfig, data_store: DictProxy) -> None:
"""Evaluates the model for a set of parameters.
Writes output to :attr:`data_store`.
Parameters
----------
settings : :class:`~.RIDTConfig`
The settings for the run in question.
Returns
-------
None
"""
domain = Domain(setting)
solver = WellMixed(setting)
output = solver(domain.time)
data_store[setting].add("points", "well_mixed", output)


class WellMixedRun:
"""The class which orchestrates an Well Mixed model run.
Expand Down Expand Up @@ -51,7 +76,8 @@ def __init__(self, settings: RIDTConfig, output_dir: str):
"""
self.settings = settings
self.outdir = output_dir
self.data_store = BatchDataStore()
self.manager = Manager()
self.data_store = BatchDataStore(self.manager)
self.exposure_store = None
self.space = self.prepare()
self.evaluate()
Expand Down Expand Up @@ -84,21 +110,32 @@ def evaluate(self) -> None:
"""
print("Evaluating model over domain... ")
for setting in self.space.space:
count = f"{self.space.linear_index(setting) + 1}/{len(self.space)}"
print(f"Evaluating computational space element {count}")
self.run(setting)
count = 4
inp = []
for item in self.space.space:
self.data_store.add_run(item)
inp.append((item, self.data_store.store))
with Pool(processes=4) as pool:
pool.starmap(run, inp)
# for setting in self.space.space:
# count = f"{self.space.linear_index(setting) + 1}/{len(self.space)}"
# print(f"Evaluating computational space element {count}")
# self.run(setting)
# for setting in self.space.space:
# count = f"{self.space.linear_index(setting) + 1}/{len(self.space)}"
# print(f"Evaluating computational space element {count}")
# self.run(setting)

def run(self, setting: RIDTConfig) -> None:
"""Evaluates the model for a set of parameters.
Writes output to :attr:`data_store`.
Parameters
----------
settings : :class:`~.RIDTConfig`
The settings for the run in question.
Returns
-------
None
Expand All @@ -109,7 +146,7 @@ def run(self, setting: RIDTConfig) -> None:
solver = WellMixed(setting)
output = solver(domain.time)
self.data_store[setting].add("points", "well_mixed", output)

def compute_exposure(self):
"""Computes the exposure from the concentration data.
Expand Down
9 changes: 6 additions & 3 deletions ridt/data/batchdatastore.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from multiprocessing import Manager

from ridt.base import Error

from ridt.config import RIDTConfig
Expand All @@ -19,11 +21,12 @@ class BatchDataStore:
"""

def __init__(self):
def __init__(self, manager: Manager):
"""The :class:`BatchDataStore` constructor.
"""
self.store = dict()
self.manager = manager
self.store = manager.dict()

def add_run(self, setting: RIDTConfig) -> None:
"""Add a new empty data store for a new settings instance.
Expand All @@ -41,7 +44,7 @@ def add_run(self, setting: RIDTConfig) -> None:
"""
if setting not in self.store:
self.store[setting] = DataStore()
self.store[setting] = DataStore(self.manager)
else:
pass

Expand Down
16 changes: 11 additions & 5 deletions ridt/data/datastore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from multiprocessing import Manager

from typing import Tuple
from typing import Dict

from numpy import ndarray
from numpy import nanargmax
Expand Down Expand Up @@ -46,15 +49,18 @@ class Dimensions:
planes = 3
domain = 4

def __init__(self):
def __init__(self, manager: Manager):
"""The :class:`DataStore` constructor.
"""
self.points = dict()
self.lines = dict()
self.planes = dict()
self.domain = dict()
self.points = manager.dict()
self.lines = manager.dict()
self.planes = manager.dict()
self.domain = manager.dict()

def get_sub_store(self, geometry: str) -> Dict[str, ndarray]:
return getattr(self, geometry).copy()

def add(self, geometry: str, id: str, data: ndarray)-> None:
"""Adds a new item to the data store.
Expand Down
2 changes: 1 addition & 1 deletion ridt/data/datastorewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,5 @@ def write(self, data_store: DataStore) -> None:
ConfigFileWriter(self.dir_agent.outdir, "config.json", self.setting.__source__)
for geometry in self.geometries:
self.dir_agent.create_data_dir(geometry, self.quantity)
for id in getattr(data_store, geometry):
for id in data_store.get_sub_store(geometry):
save(join(self.dir_agent.ddir, id), data_store.get(geometry, id))
9 changes: 7 additions & 2 deletions ridt/equation/eddy_diffusion.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from multiprocessing import current_process

import warnings

import numpy
Expand Down Expand Up @@ -241,7 +243,10 @@ def time(self):
The :mod:`tqdm` iterable over the :attr:`time` iterable
"""
return tqdm(enumerate(self.t), total=len(self.t), **bar_args)
p = current_process()
i = p._identity[0]
# return tqdm(enumerate(self.t), position=(i % self.position_mod) + 1, total=len(self.t), desc=self.descriptor+f"/{i%self.position_mod}", **bar_args)
return enumerate(self.t)

def romberg(self, time: float, source: Source, idx: int, idy: int, idz: int) -> float:
""" Performs romberg integration at the given grid location.
Expand Down Expand Up @@ -360,7 +365,7 @@ def log_start(self, name: str, id: str) -> None:
None
"""
print(f"Evaluating {name} source (id: {id}) for each time...")
pass

def instantaneous(self):
"""Evaluate all instanteneous sources.
Expand Down
1 change: 0 additions & 1 deletion ridt/equation/well_mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def __call__(self, t: FloatList):
The calculated concentration values.
"""

modes = ["instantaneous", "infinite_duration", "fixed_duration"]

for mode in modes:
Expand Down
Empty file.
Loading

0 comments on commit d84314e

Please sign in to comment.