Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config option daemon.default_workers and daemon.worker_process_slots #3949

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions aiida/cmdline/commands/cmd_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
from aiida.manage.configuration import get_config


def validate_positive_non_zero_integer(ctx, param, value): # pylint: disable=unused-argument,invalid-name
"""Validate that `value` is a positive integer."""
def validate_daemon_workers(ctx, param, value): # pylint: disable=unused-argument,invalid-name
"""Validate the value for the number of daemon workers to start with default set by config."""
if value is None:
value = ctx.obj.config.get_option('daemon.default_workers', ctx.obj.profile.name)

if not isinstance(value, int):
raise click.BadParameter('{} is not an integer'.format(value))

Expand All @@ -43,11 +46,15 @@ def verdi_daemon():

@verdi_daemon.command()
@click.option('--foreground', is_flag=True, help='Run in foreground.')
@click.argument('number', default=1, type=int, callback=validate_positive_non_zero_integer)
@click.argument('number', required=False, type=int, callback=validate_daemon_workers)
@decorators.with_dbenv()
@decorators.check_circus_zmq_version
def start(foreground, number):
"""Start the daemon with NUMBER workers [default=1]."""
"""Start the daemon with NUMBER workers.

If the NUMBER of desired workers is not specified, the default is used, which is determined by the configuration
option `daemon.default_workers`, which if not explicitly changed defaults to 1.
"""
from aiida.engine.daemon.client import get_daemon_client

client = get_daemon_client()
Expand Down Expand Up @@ -196,8 +203,8 @@ def restart(ctx, reset, no_wait):
"""Restart the daemon.

By default will only reset the workers of the running daemon. After the restart the same amount of workers will be
running. If the `--reset` flag is passed, however, the full circus daemon will be stopped and restarted with just
a single worker.
running. If the `--reset` flag is passed, however, the full daemon will be stopped and restarted with the default
number of workers that is started when calling `verdi daemon start` manually.
"""
from aiida.engine.daemon.client import get_daemon_client

Expand All @@ -223,7 +230,7 @@ def restart(ctx, reset, no_wait):

@verdi_daemon.command(hidden=True)
@click.option('--foreground', is_flag=True, help='Run in foreground.')
@click.argument('number', default=1, type=int, callback=validate_positive_non_zero_integer)
@click.argument('number', required=False, type=int, callback=validate_daemon_workers)
@decorators.with_dbenv()
@decorators.check_circus_zmq_version
def start_circus(foreground, number):
Expand Down
7 changes: 4 additions & 3 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,14 @@ def check_worker_load(active_slots):

:param active_slots: the number of currently active worker slots
"""
from aiida.common.exceptions import CircusCallError
from aiida.cmdline.utils import echo
from aiida.manage.external.rmq import _RMQ_TASK_PREFETCH_COUNT
from aiida.common.exceptions import CircusCallError
from aiida.manage.configuration import get_config

warning_threshold = 0.9 # 90%

slots_per_worker = _RMQ_TASK_PREFETCH_COUNT
config = get_config()
slots_per_worker = config.get_option('daemon.worker_process_slots', config.current_profile.name)

try:
active_workers = get_num_workers()
Expand Down
18 changes: 18 additions & 0 deletions aiida/manage/configuration/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
__all__ = ('get_option', 'get_option_names', 'parse_option')

NO_DEFAULT = ()
DEFAULT_DAEMON_WORKERS = 1
DEFAULT_DAEMON_TIMEOUT = 20 # Default timeout in seconds for circus client calls
DEFAULT_DAEMON_WORKER_PROCESS_SLOTS = 200
VALID_LOG_LEVELS = ['CRITICAL', 'ERROR', 'WARNING', 'REPORT', 'INFO', 'DEBUG']

Option = collections.namedtuple(
Expand All @@ -30,6 +32,14 @@
'description': 'The polling interval in seconds to be used by process runners',
'global_only': False,
},
'daemon.default_workers': {
'key': 'daemon_default_workers',
'valid_type': 'int',
'valid_values': None,
'default': DEFAULT_DAEMON_WORKERS,
'description': 'The default number of workers to be launched by `verdi daemon start`',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand - e.g. if I use incr does this value change?
In which sense is it a default rather than the actual number of daemon workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verdi daemon incr is an independent command. It still takes an argument to tell how many workers should be added, verdi daemon incr N will add N workers with default being 1. The new option only applies how many workers are started automatically when invoking verdi daemon start. Already you can do verdi daemon start 10 to start 10 workers straight away, but now you can configure it to have it start 10 by default. Saves an entire three key strokes ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I just wanted to make sure.
I think some users might assume that if you do verdi daemon incr 1 and then restart the daemon that you would keep the increased number of workers rather than resetting to this default.

It might make sense to mention explicitly in the docstring of verdi daemon incr that these changes are reset whenever you stop the daemon, and that you should use this new config option to make permanent changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit more subtle than that even. Doing verdi daemon restart will restart the workers but not the daemon itself, i.e. you will keep how every many workers that were running before the restart command. Doing verdi daemon restart --reset literally does verdi daemon stop && verdi daemon start so there it will start whatever is the default for verdi daemon start, which can now be configured through this new option

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already explained in the docstring of verdi daemon restart:

    By default will only reset the workers of the running daemon. After the restart the same amount of workers will be
    running. If the `--reset` flag is passed, however, the full circus daemon will be stopped and restarted with just
    a single worker.

I just noticed that we just have to update the last bit with respect to the new default behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the docstrings of verdi daemon start and verdi daemon restart in the first commit to reflect the new behavior and give more detail

'global_only': False,
},
'daemon.timeout': {
'key': 'daemon_timeout',
'valid_type': 'int',
Expand All @@ -38,6 +48,14 @@
'description': 'The timeout in seconds for calls to the circus client',
'global_only': False,
},
'daemon.worker_process_slots': {
'key': 'daemon_worker_process_slots',
'valid_type': 'int',
'valid_values': None,
'default': DEFAULT_DAEMON_WORKER_PROCESS_SLOTS,
'description': 'The maximum number of concurrent process tasks that each daemon worker can handle',
'global_only': False,
},
'db.batch_size': {
'key': 'db_batch_size',
'valid_type': 'int',
Expand Down
13 changes: 0 additions & 13 deletions aiida/manage/external/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
# know how to avoid warnings. For more info see
# https://github.com/aiidateam/aiida-core/issues/1142
_RMQ_URL = 'amqp://127.0.0.1'
_RMQ_TASK_PREFETCH_COUNT = 100 # This value should become configurable per profile at some point
_RMQ_HEARTBEAT_TIMEOUT = 600 # Maximum that can be set by client, with default RabbitMQ server configuration
_LAUNCH_QUEUE = 'process.queue'
_MESSAGE_EXCHANGE = 'messages'
Expand All @@ -56,18 +55,6 @@ def get_rmq_url(heartbeat_timeout=None):
return url


def get_rmq_config(prefix):
"""
Get the RabbitMQ configuration dictionary for a given prefix. If the prefix is not
specified, the prefix will be retrieved from the currently loaded profile configuration

:param prefix: a string prefix for the RabbitMQ communication queues and exchanges
:returns: the configuration dictionary for the RabbitMQ communicators
"""
rmq_config = {'url': get_rmq_url(), 'prefix': prefix, 'task_prefetch_count': _RMQ_TASK_PREFETCH_COUNT}
return rmq_config


def get_launch_queue_name(prefix=None):
"""
Return the launch queue name with an optional prefix
Expand Down
20 changes: 15 additions & 5 deletions aiida/manage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
###########################################################################
# pylint: disable=cyclic-import
"""AiiDA manager for global settings"""

import functools

__all__ = ('get_manager', 'reset_manager')
Expand All @@ -33,6 +32,17 @@ class Manager:
* reset manager cache when loading a new profile
"""

@staticmethod
def get_config():
ltalirz marked this conversation as resolved.
Show resolved Hide resolved
"""Return the current config.

:return: current loaded config instance
:rtype: :class:`~aiida.manage.configuration.config.Config`
:raises aiida.common.ConfigurationError: if the configuration file could not be found, read or deserialized
"""
from .configuration import get_config
return get_config()

@staticmethod
def get_profile():
"""Return the current loaded profile, if any
Expand Down Expand Up @@ -166,10 +176,11 @@ def create_communicator(self, task_prefetch_count=None, with_orm=True):
"""
from aiida.manage.external import rmq
import kiwipy.rmq

profile = self.get_profile()

if task_prefetch_count is None:
task_prefetch_count = rmq._RMQ_TASK_PREFETCH_COUNT # pylint: disable=protected-access
task_prefetch_count = self.get_config().get_option('daemon.worker_process_slots', profile.name)

url = rmq.get_rmq_url()
prefix = profile.rmq_prefix
Expand Down Expand Up @@ -261,11 +272,10 @@ def create_runner(self, with_persistence=True, **kwargs):
:rtype: :class:`aiida.engine.runners.Runner`
"""
from aiida.engine import runners
from .configuration import get_config

config = get_config()
config = self.get_config()
profile = self.get_profile()
poll_interval = 0.0 if profile.is_test_profile else config.get_option('runner.poll.interval')
poll_interval = 0.0 if profile.is_test_profile else config.get_option('runner.poll.interval', profile.name)

settings = {'rmq_submit': False, 'poll_interval': poll_interval}
settings.update(kwargs)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/verdi/verdi_user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ Below is a list with all available subcommands.
incr Add NUMBER [default=1] workers to the running daemon.
logshow Show the log of the daemon, press CTRL+C to quit.
restart Restart the daemon.
start Start the daemon with NUMBER workers [default=1].
start Start the daemon with NUMBER workers.
status Print the status of the current daemon or all daemons.
stop Stop the daemon.

Expand Down
47 changes: 44 additions & 3 deletions tests/cmdline/commands/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,43 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for `verdi daemon`."""

from click.testing import CliRunner
import pytest

from aiida.backends.testbase import AiidaTestCase
from aiida.cmdline.commands import cmd_daemon
from aiida.common.extendeddicts import AttributeDict
from aiida.engine.daemon.client import DaemonClient
from aiida.manage.configuration import get_config


class VerdiRunner(CliRunner):
"""Subclass of `click`'s `CliRunner` that injects an object in the context containing current config and profile."""

def __init__(self, config, profile, **kwargs):
"""Construct an instance and define the `obj` dictionary that is required by the `Context`."""
super().__init__(**kwargs)
self.obj = AttributeDict({'config': config, 'profile': profile})

def invoke(self, *args, **extra): # pylint: disable=arguments-differ
"""Invoke the command but add the `obj` to the `extra` keywords.

The `**extra` keywords will be forwarded all the way to the `Context` that finally invokes the command. Some
`verdi` commands will rely on this `obj` to be there to retrieve the current active configuration and profile.
"""
extra['obj'] = self.obj
return super().invoke(*args, **extra)


class TestVerdiDaemon(AiidaTestCase):
"""Tests for `verdi daemon` commands."""

def setUp(self):
super().setUp()
self.daemon_client = DaemonClient(get_config().current_profile)
self.cli_runner = CliRunner()
self.config = get_config()
self.profile = self.config.current_profile
self.daemon_client = DaemonClient(self.profile)
self.cli_runner = VerdiRunner(self.config, self.profile)

def test_daemon_start(self):
"""Test `verdi daemon start`."""
Expand Down Expand Up @@ -68,6 +88,27 @@ def test_daemon_start_number(self):
finally:
self.daemon_client.stop_daemon(wait=True)

@pytest.mark.skip(reason='Test fails non-deterministically; see issue #3051.')
def test_daemon_start_number_config(self):
"""Test `verdi daemon start` with `daemon.default_workers` config option being set."""
number = 3
self.config.set_option('daemon.default_workers', number, self.profile.name)

try:
result = self.cli_runner.invoke(cmd_daemon.start)
self.assertClickResultNoException(result)

daemon_response = self.daemon_client.get_daemon_info()
worker_response = self.daemon_client.get_worker_info()

self.assertIn('status', daemon_response, daemon_response)
self.assertEqual(daemon_response['status'], 'ok', daemon_response)

self.assertIn('info', worker_response, worker_response)
self.assertEqual(len(worker_response['info']), number, worker_response)
finally:
self.daemon_client.stop_daemon(wait=True)

def test_foreground_multiple_workers(self):
"""Test `verdi daemon start` in foreground with more than one worker will fail."""
try:
Expand Down
8 changes: 5 additions & 3 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def setUp(self):
aiida.cmdline.utils.common.get_num_workers = lambda: 1

def tearDown(self):
# Reset the redifined function
# Reset the redefined function
import aiida.cmdline.utils.common
aiida.cmdline.utils.common.get_num_workers = self.real_get_num_workers
super().tearDown()
Expand All @@ -395,10 +395,12 @@ def test_list_worker_slot_warning(self):
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.engine import ProcessState
from aiida.manage.configuration import get_config

# Get the number of allowed processes per worker:
from aiida.manage.external.rmq import _RMQ_TASK_PREFETCH_COUNT
limit = int(_RMQ_TASK_PREFETCH_COUNT * 0.9)
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)

# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
Expand Down