Skip to content

Commit

Permalink
Merge pull request #233 from pynbody/pynbody-server-performance
Browse files Browse the repository at this point in the history
Add some performance tracking to pynbody_server.py
  • Loading branch information
apontzen authored Dec 13, 2023
2 parents fa2bae9 + dd6c55e commit 4e79538
Show file tree
Hide file tree
Showing 23 changed files with 571 additions and 93 deletions.
2 changes: 1 addition & 1 deletion tangos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
from .core import *
from .query import *

__version__ = '1.9.0'
__version__ = '1.9.1'
3 changes: 3 additions & 0 deletions tangos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
# Property writer: don't bother committing even if a timestep is finished if this time hasn't elapsed:
PROPERTY_WRITER_MINIMUM_TIME_BETWEEN_COMMITS = 300 # seconds

# Minimum time between providing updates to the user during tangos write, when running in parallel
PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES = 600 # seconds

try:
from .config_local import *
except:
Expand Down
1 change: 0 additions & 1 deletion tangos/input_handlers/output_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def enumerate_timestep_extensions(self, parallel=False):
pre_extension_length = len(os.path.join(config.base, self.basename))
steps = glob.glob(os.path.join(config.base, self.basename, "step.*"))
for i in steps:
print(i, i[pre_extension_length:], self.strip_slashes(i[pre_extension_length:]))
yield self.strip_slashes(i[pre_extension_length:])

def get_timestep_properties(self, ts_extension):
Expand Down
5 changes: 4 additions & 1 deletion tangos/input_handlers/pynbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ def load_timestep_without_caching(self, ts_extension, mode=None):
raise NotImplementedError("Load mode %r is not implemented"%mode)

def load_region(self, ts_extension, region_specification, mode=None):
if mode is None or mode=='server':
if mode is None:
timestep = self.load_timestep(ts_extension, mode)
return timestep[region_specification]
elif mode=='server':
timestep = self.load_timestep(ts_extension, mode)
return timestep.get_view(region_specification)
elif mode=='server-shared-mem':
from ..parallel_tasks import pynbody_server as ps
timestep = self.load_timestep(ts_extension, mode)
Expand Down
15 changes: 14 additions & 1 deletion tangos/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ def __init__(self):
self.buffer = StringIO()
self.handler_buffer = logging.StreamHandler(self.buffer)
self.handler_buffer.setLevel(logging.INFO)
self.handler_buffer.setFormatter(formatter)
self._suspended_handlers = []

def __enter__(self):
self._suspended_handlers = copy.copy(logger.handlers)
for x_handler in self._suspended_handlers:
logger.removeHandler(x_handler)
logger.addHandler(self.handler_buffer)
return self

def __exit__(self, *exc_info):
for x_handler in self._suspended_handlers:
Expand All @@ -34,8 +36,19 @@ def __exit__(self, *exc_info):
def get_output(self):
return self.buffer.getvalue()

def get_output_without_timestamps(self):
lines = self.get_output().split("\n")
result = ""
for l in lines:
try:
result += l.split(" : ", 1)[1]+"\n"
except IndexError:
result += l+"\n"
return result


def set_identity_string(identifier):
global handler_stderr
formatter = logging.Formatter(identifier+"%(asctime)s : %(message)s")
handler_stderr.setFormatter(formatter)
for handler in logger.handlers:
handler.setFormatter(formatter)
37 changes: 23 additions & 14 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import importlib
import re
import sys
import time
import traceback
import warnings

import tangos.core.creator

from .. import config, core

backend = None
_backend_name = config.default_backend
_num_procs = None # only for multiprocessing backend

_on_exit = [] # list of functions to call when parallelism is shutting down

from .. import log
from ..log import logger
from . import backends, jobs, message
from . import accumulative_statistics, jobs, message


def use(name):
Expand Down Expand Up @@ -51,10 +48,15 @@ def parallelism_is_active():
global _backend_name
return _backend_name != 'null' and backend is not None

def launch(function, args=None):
def launch(function, args=None, backend_kwargs=None):
if args is None:
args = []

if backend_kwargs is None:
backend_kwargs = {}

result = None

# we need to close any existing connections because we may fork, which leads to
# buggy/unreliable behaviour. This should invalidate the session attached to
# any existing objects, which is intended behaviour. If you are using parallel
Expand All @@ -71,15 +73,17 @@ def launch(function, args=None):
try:
core.close_db()
if _backend_name != 'null':
backend.launch(_exec_function_or_server, [function, connection_info, args])
result = backend.launch(_exec_function_or_server, [function, connection_info, args], **backend_kwargs)
else:
function(*args)
result = function(*args)
finally:
if connection_info is not None:
core.init_db(*connection_info)
finally:
deinit_backend()

return result

def distributed(file_list, proc=None, of=None):
"""Distribute a list of tasks between all nodes"""

Expand Down Expand Up @@ -138,13 +142,18 @@ def _server_thread():
else:
obj.process()

def on_exit_parallelism(function):
global _on_exit
_on_exit.append(function)

log.logger.info("Terminating manager process")
def _shutdown_parallelism():
global backend, _on_exit
log.logger.debug("Clearing up process")

for fn in _on_exit:
fn()
_on_exit = []

def _shutdown_parallelism():
global backend
log.logger.info("Terminating worker process")
backend.barrier()
backend.finalize()
backend = None
Expand All @@ -153,6 +162,6 @@ def _shutdown_parallelism():



from . import remote_import
from . import remote_import, shared_set
from .barrier import barrier
from .lock import ExclusiveLock
96 changes: 96 additions & 0 deletions tangos/parallel_tasks/accumulative_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import time

from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
from ..log import logger
from .message import Message

_new_accumulator_requested_for_ranks = []
_new_accumulator = None
_existing_accumulators = []
class CreateNewAccumulatorMessage(Message):

def process(self):
from . import backend
global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators
assert issubclass(self.contents, StatisticsAccumulatorBase)
if _new_accumulator is None:
_new_accumulator = self.contents()
_new_accumulator_requested_for_ranks = [self.source]
else:
assert self.source not in _new_accumulator_requested_for_ranks
assert isinstance(_new_accumulator, self.contents)
_new_accumulator_requested_for_ranks.append(self.source)

from . import backend

if len(_new_accumulator_requested_for_ranks) == backend.size()-1:
self._confirm_new_accumulator()

def _confirm_new_accumulator(self):
global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators
from . import backend, on_exit_parallelism
accumulator_id = len(_existing_accumulators)
_existing_accumulators.append(_new_accumulator)

locally_bound_accumulator = _new_accumulator
logger.debug("Created new accumulator of type %s with id %d" % (locally_bound_accumulator.__class__.__name__, accumulator_id))
on_exit_parallelism(lambda: locally_bound_accumulator.report_to_log_if_needed(logger, 0.05))

_new_accumulator = None
_new_accumulator_requested_for_ranks = []

for destination in range(1, backend.size()):
AccumulatorIdMessage(accumulator_id).send(destination)
class AccumulatorIdMessage(Message):
pass
class AccumulateStatisticsMessage(Message):
def process(self):
global _existing_accumulators
_existing_accumulators[self.contents.id].add(self.contents)

class StatisticsAccumulatorBase:
REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
def __init__(self, allow_parallel=False):
"""This is a base class for accumulating statistics, possibly in parallel across multiple processes.
Note that if allow_parallel is True, then all processes must create an instance of this class
(in effect creating the class will act like a barrier). If only some processes create an instance,
they will block, possibly creating a deadlock. This is why allow_parallel defaults to False.
"""
from . import backend, parallelism_is_active
self._last_reported = time.time()
self._parallel = allow_parallel and parallelism_is_active() and backend.rank() != 0
if self._parallel:
logger.debug(f"Registering {self.__class__}")
CreateNewAccumulatorMessage(self.__class__).send(0)
logger.debug(f"Awaiting accumulator id for {self.__class__}")
self.id = AccumulatorIdMessage.receive(0).contents
logger.debug(f"Received accumulator id={ self.id}")

def report_to_server(self):
if self._parallel:
AccumulateStatisticsMessage(self).send(0)
self.reset()

def reset(self):
raise NotImplementedError("This method should be overriden to reset the statistics")

def add(self, other):
raise NotImplementedError("This method should be overriden to add two accumulations together")

def report_to_log(self, logger):
raise NotImplementedError("This method should be overriden to log a statistics report")

def report_to_log_or_server(self, logger):
if self._parallel:
self.report_to_server()
else:
self.report_to_log(logger)

def report_to_log_if_needed(self, logger, after_time=None):
if after_time is None:
after_time = self.REPORT_AFTER
if time.time() - self._last_reported > after_time:
self.report_to_log(logger)
self._last_reported = time.time()
Loading

0 comments on commit 4e79538

Please sign in to comment.