Skip to content

Commit

Permalink
Add config option daemon.worker_process_slots
Browse files Browse the repository at this point in the history
This option is used in the construction of the RabbitMQ communicator and
will determine the maximum number of process tasks that any one daemon
runner can operate on concurrently. If this number is set too high, the
runner can get overwhelmed as there is no logic for it to prioritize
finishing active processes over taking on new processes that have
arrived in the process task queue.
  • Loading branch information
sphuber committed Apr 21, 2020
1 parent 3ef7392 commit 7fd1b81
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 26 deletions.
7 changes: 4 additions & 3 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,14 @@ def check_worker_load(active_slots):
:param active_slots: the number of currently active worker slots
"""
from aiida.common.exceptions import CircusCallError
from aiida.cmdline.utils import echo
from aiida.manage.external.rmq import _RMQ_TASK_PREFETCH_COUNT
from aiida.common.exceptions import CircusCallError
from aiida.manage.configuration import get_config

warning_threshold = 0.9 # 90%

slots_per_worker = _RMQ_TASK_PREFETCH_COUNT
config = get_config()
slots_per_worker = config.get_option('daemon.worker_process_slots', config.current_profile.name)

try:
active_workers = get_num_workers()
Expand Down
9 changes: 9 additions & 0 deletions aiida/manage/configuration/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NO_DEFAULT = ()
DEFAULT_DAEMON_WORKERS = 1
DEFAULT_DAEMON_TIMEOUT = 20 # Default timeout in seconds for circus client calls
DEFAULT_DAEMON_WORKER_PROCESS_SLOTS = 200
VALID_LOG_LEVELS = ['CRITICAL', 'ERROR', 'WARNING', 'REPORT', 'INFO', 'DEBUG']

Option = collections.namedtuple(
Expand Down Expand Up @@ -47,6 +48,14 @@
'description': 'The timeout in seconds for calls to the circus client',
'global_only': False,
},
'daemon.worker_process_slots': {
'key': 'daemon_worker_process_slots',
'valid_type': 'int',
'valid_values': None,
'default': DEFAULT_DAEMON_WORKER_PROCESS_SLOTS,
'description': 'The maximum number of concurrent process tasks that each daemon worker can handle',
'global_only': False,
},
'db.batch_size': {
'key': 'db_batch_size',
'valid_type': 'int',
Expand Down
13 changes: 0 additions & 13 deletions aiida/manage/external/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
# know how to avoid warnings. For more info see
# https://github.com/aiidateam/aiida-core/issues/1142
_RMQ_URL = 'amqp://127.0.0.1'
_RMQ_TASK_PREFETCH_COUNT = 100 # This value should become configurable per profile at some point
_RMQ_HEARTBEAT_TIMEOUT = 600 # Maximum that can be set by client, with default RabbitMQ server configuration
_LAUNCH_QUEUE = 'process.queue'
_MESSAGE_EXCHANGE = 'messages'
Expand All @@ -56,18 +55,6 @@ def get_rmq_url(heartbeat_timeout=None):
return url


def get_rmq_config(prefix):
"""
Get the RabbitMQ configuration dictionary for a given prefix. If the prefix is not
specified, the prefix will be retrieved from the currently loaded profile configuration
:param prefix: a string prefix for the RabbitMQ communication queues and exchanges
:returns: the configuration dictionary for the RabbitMQ communicators
"""
rmq_config = {'url': get_rmq_url(), 'prefix': prefix, 'task_prefetch_count': _RMQ_TASK_PREFETCH_COUNT}
return rmq_config


def get_launch_queue_name(prefix=None):
"""
Return the launch queue name with an optional prefix
Expand Down
20 changes: 15 additions & 5 deletions aiida/manage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
###########################################################################
# pylint: disable=cyclic-import
"""AiiDA manager for global settings"""

import functools

__all__ = ('get_manager', 'reset_manager')
Expand All @@ -33,6 +32,17 @@ class Manager:
* reset manager cache when loading a new profile
"""

@staticmethod
def get_config():
"""Return the current config.
:return: current loaded config instance
:rtype: :class:`~aiida.manage.configuration.config.Config`
:raises aiida.common.ConfigurationError: if the configuration file could not be found, read or deserialized
"""
from .configuration import get_config
return get_config()

@staticmethod
def get_profile():
"""Return the current loaded profile, if any
Expand Down Expand Up @@ -166,10 +176,11 @@ def create_communicator(self, task_prefetch_count=None, with_orm=True):
"""
from aiida.manage.external import rmq
import kiwipy.rmq

profile = self.get_profile()

if task_prefetch_count is None:
task_prefetch_count = rmq._RMQ_TASK_PREFETCH_COUNT # pylint: disable=protected-access
task_prefetch_count = self.get_config().get_option('daemon.worker_process_slots', profile.name)

url = rmq.get_rmq_url()
prefix = profile.rmq_prefix
Expand Down Expand Up @@ -261,11 +272,10 @@ def create_runner(self, with_persistence=True, **kwargs):
:rtype: :class:`aiida.engine.runners.Runner`
"""
from aiida.engine import runners
from .configuration import get_config

config = get_config()
config = self.get_config()
profile = self.get_profile()
poll_interval = 0.0 if profile.is_test_profile else config.get_option('runner.poll.interval')
poll_interval = 0.0 if profile.is_test_profile else config.get_option('runner.poll.interval', profile.name)

settings = {'rmq_submit': False, 'poll_interval': poll_interval}
settings.update(kwargs)
Expand Down
46 changes: 44 additions & 2 deletions tests/cmdline/commands/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,38 @@

from aiida.backends.testbase import AiidaTestCase
from aiida.cmdline.commands import cmd_daemon
from aiida.common.extendeddicts import AttributeDict
from aiida.engine.daemon.client import DaemonClient
from aiida.manage.configuration import get_config


class VerdiRunner(CliRunner):
"""Subclass of `click`'s `CliRunner` that injects an object in the context containing current config and profile."""

def __init__(self, config, profile, **kwargs):
"""Construct an instance and define the `obj` dictionary that is required by the `Context`."""
super().__init__(**kwargs)
self.obj = AttributeDict({'config': config, 'profile': profile})

def invoke(self, *args, **extra): # pylint: disable=arguments-differ
"""Invoke the command but add the `obj` to the `extra` keywords.
The `**extra` keywords will be forwarded all the way to the `Context` that finally invokes the command. Some
`verdi` commands will rely on this `obj` to be there to retrieve the current active configuration and profile.
"""
extra['obj'] = self.obj
return super().invoke(*args, **extra)


class TestVerdiDaemon(AiidaTestCase):
"""Tests for `verdi daemon` commands."""

def setUp(self):
super().setUp()
self.daemon_client = DaemonClient(get_config().current_profile)
self.cli_runner = CliRunner()
self.config = get_config()
self.profile = self.config.current_profile
self.daemon_client = DaemonClient(self.profile)
self.cli_runner = VerdiRunner(self.config, self.profile)

def test_daemon_start(self):
"""Test `verdi daemon start`."""
Expand Down Expand Up @@ -67,6 +88,27 @@ def test_daemon_start_number(self):
finally:
self.daemon_client.stop_daemon(wait=True)

@pytest.mark.skip(reason='Test fails non-deterministically; see issue #3051.')
def test_daemon_start_number_config(self):
"""Test `verdi daemon start` with `daemon.default_workers` config option being set."""
number = 3
self.config.set_option('daemon.default_workers', number, self.profile.name)

try:
result = self.cli_runner.invoke(cmd_daemon.start)
self.assertClickResultNoException(result)

daemon_response = self.daemon_client.get_daemon_info()
worker_response = self.daemon_client.get_worker_info()

self.assertIn('status', daemon_response, daemon_response)
self.assertEqual(daemon_response['status'], 'ok', daemon_response)

self.assertIn('info', worker_response, worker_response)
self.assertEqual(len(worker_response['info']), number, worker_response)
finally:
self.daemon_client.stop_daemon(wait=True)

def test_foreground_multiple_workers(self):
"""Test `verdi daemon start` in foreground with more than one worker will fail."""
try:
Expand Down
8 changes: 5 additions & 3 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def setUp(self):
aiida.cmdline.utils.common.get_num_workers = lambda: 1

def tearDown(self):
# Reset the redifined function
# Reset the redefined function
import aiida.cmdline.utils.common
aiida.cmdline.utils.common.get_num_workers = self.real_get_num_workers
super().tearDown()
Expand All @@ -395,10 +395,12 @@ def test_list_worker_slot_warning(self):
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.engine import ProcessState
from aiida.manage.configuration import get_config

# Get the number of allowed processes per worker:
from aiida.manage.external.rmq import _RMQ_TASK_PREFETCH_COUNT
limit = int(_RMQ_TASK_PREFETCH_COUNT * 0.9)
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)

# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
Expand Down

0 comments on commit 7fd1b81

Please sign in to comment.