Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Command play all only affects active processes. #4671

Merged
merged 6 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ def process_play(processes, all_entries, timeout, wait):
raise click.BadOptionUsage('all', 'cannot specify individual processes and the `--all` flag at the same time.')

if not processes and all_entries:
builder = QueryBuilder().append(ProcessNode, filters={'attributes.paused': True})
filters = CalculationQueryBuilder().get_filters(process_state=('created', 'waiting', 'running'), paused=True)
builder = QueryBuilder().append(ProcessNode, filters=filters)
processes = builder.all(flat=True)

futures = {}
Expand Down
170 changes: 73 additions & 97 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for `verdi process`."""
import subprocess
import sys
import time
import asyncio
from concurrent.futures import Future
Expand All @@ -23,7 +21,6 @@
from aiida.cmdline.commands import cmd_process
from aiida.common.links import LinkType
from aiida.common.log import LOG_LEVEL_REPORT
from aiida.manage.manager import get_manager
from aiida.orm import CalcJobNode, WorkflowNode, WorkFunctionNode, WorkChainNode

from tests.utils import processes as test_processes
Expand All @@ -33,100 +30,6 @@ def get_result_lines(result):
return [e for e in result.output.split('\n') if e]


class TestVerdiProcessDaemon(AiidaTestCase):
"""Tests for `verdi process` that require a running daemon."""

TEST_TIMEOUT = 5.

def setUp(self):
super().setUp()
from aiida.cmdline.utils.common import get_env_with_venv_bin
from aiida.engine.daemon.client import DaemonClient
from aiida.manage.configuration import get_config

# Add the current python path to the environment that will be used for the daemon sub process. This is necessary
# to guarantee the daemon can also import all the classes that are defined in this `tests` module.
env = get_env_with_venv_bin()
env['PYTHONPATH'] = ':'.join(sys.path)

profile = get_config().current_profile
self.daemon_client = DaemonClient(profile)
self.daemon = subprocess.Popen(
self.daemon_client.cmd_string.split(), stderr=sys.stderr, stdout=sys.stdout, env=env
)
self.runner = get_manager().create_runner(rmq_submit=True)
self.cli_runner = CliRunner()

def tearDown(self):
import os
import signal

os.kill(self.daemon.pid, signal.SIGTERM)
super().tearDown()

@pytest.mark.skip(reason='fails to complete randomly (see issue #4731)')
@pytest.mark.requires_rmq
def test_pause_play_kill(self):
"""
Test the pause/play/kill commands
"""
# pylint: disable=no-member
from aiida.orm import load_node

calc = self.runner.submit(test_processes.WaitProcess)
start_time = time.time()
while calc.process_state is not plumpy.ProcessState.WAITING:
if time.time() - start_time >= self.TEST_TIMEOUT:
self.fail('Timed out waiting for process to enter waiting state')

# Make sure that calling any command on a non-existing process id will not except but print an error
# To simulate a process without a corresponding task, we simply create a node and store it. This node will not
# have an associated task at RabbitMQ, but it will be a valid `ProcessNode` so it will pass the initial
# filtering of the `verdi process` commands
orphaned_node = WorkFunctionNode().store()
non_existing_process_id = str(orphaned_node.pk)
for command in [cmd_process.process_pause, cmd_process.process_play, cmd_process.process_kill]:
result = self.cli_runner.invoke(command, [non_existing_process_id])
self.assertClickResultNoException(result)
self.assertIn('Error:', result.output)

self.assertFalse(calc.paused)
result = self.cli_runner.invoke(cmd_process.process_pause, [str(calc.pk)])
self.assertIsNone(result.exception, result.output)

# We need to make sure that the process is picked up by the daemon and put in the Waiting state before we start
# running the CLI commands, so we add a broadcast subscriber for the state change, which when hit will set the
# future to True. This will be our signal that we can start testing
waiting_future = Future()
filters = kiwipy.BroadcastFilter(
lambda *args, **kwargs: waiting_future.set_result(True), sender=calc.pk, subject='state_changed.*.waiting'
)
self.runner.communicator.add_broadcast_subscriber(filters)

# The process may already have been picked up by the daemon and put in the waiting state, before the subscriber
# got the chance to attach itself, making it have missed the broadcast. That's why check if the state is already
# waiting, and if not, we run the loop of the runner to start waiting for the broadcast message. To make sure
# that we have the latest state of the node as it is in the database, we force refresh it by reloading it.
calc = load_node(calc.pk)
if calc.process_state != plumpy.ProcessState.WAITING:
self.runner.loop.run_until_complete(asyncio.wait_for(waiting_future, timeout=5.0))

# Here we now that the process is with the daemon runner and in the waiting state so we can starting running
# the `verdi process` commands that we want to test
result = self.cli_runner.invoke(cmd_process.process_pause, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertTrue(calc.paused)

result = self.cli_runner.invoke(cmd_process.process_play, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertFalse(calc.paused)

result = self.cli_runner.invoke(cmd_process.process_kill, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertTrue(calc.is_terminated)
self.assertTrue(calc.is_killed)


class TestVerdiProcess(AiidaTestCase):
"""Tests for `verdi process`."""

Expand Down Expand Up @@ -490,3 +393,76 @@ def test_multiple_processes(self):
self.assertIn('No callers found', get_result_lines(result)[0])
self.assertIn(str(self.node_root.pk), get_result_lines(result)[1])
self.assertIn(str(self.node_root.pk), get_result_lines(result)[2])


@pytest.mark.skip(reason='fails to complete randomly (see issue #4731)')
@pytest.mark.requires_rmq
@pytest.mark.usefixtures('with_daemon', 'clear_database_before_test')
@pytest.mark.parametrize('cmd_try_all', (True, False))
def test_pause_play_kill(cmd_try_all, run_cli_command):
"""
Test the pause/play/kill commands
"""
# pylint: disable=no-member, too-many-locals
from aiida.cmdline.commands.cmd_process import process_pause, process_play, process_kill
from aiida.manage.manager import get_manager
from aiida.engine import ProcessState
from aiida.orm import load_node

runner = get_manager().create_runner(rmq_submit=True)
calc = runner.submit(test_processes.WaitProcess)

test_daemon_timeout = 5.
start_time = time.time()
while calc.process_state is not plumpy.ProcessState.WAITING:
if time.time() - start_time >= test_daemon_timeout:
raise RuntimeError('Timed out waiting for process to enter waiting state')

# Make sure that calling any command on a non-existing process id will not except but print an error
# To simulate a process without a corresponding task, we simply create a node and store it. This node will not
# have an associated task at RabbitMQ, but it will be a valid `ProcessNode` with and active state, so it will
# pass the initial filtering of the `verdi process` commands
orphaned_node = WorkFunctionNode()
orphaned_node.set_process_state(ProcessState.RUNNING)
orphaned_node.store()
non_existing_process_id = str(orphaned_node.pk)
for command in [process_pause, process_play, process_kill]:
result = run_cli_command(command, [non_existing_process_id])
assert 'Error:' in result.output

assert not calc.paused
result = run_cli_command(process_pause, [str(calc.pk)])

# We need to make sure that the process is picked up by the daemon and put in the Waiting state before we start
# running the CLI commands, so we add a broadcast subscriber for the state change, which when hit will set the
# future to True. This will be our signal that we can start testing
waiting_future = Future()
filters = kiwipy.BroadcastFilter(
lambda *args, **kwargs: waiting_future.set_result(True), sender=calc.pk, subject='state_changed.*.waiting'
)
runner.communicator.add_broadcast_subscriber(filters)

# The process may already have been picked up by the daemon and put in the waiting state, before the subscriber
# got the chance to attach itself, making it have missed the broadcast. That's why check if the state is already
# waiting, and if not, we run the loop of the runner to start waiting for the broadcast message. To make sure
# that we have the latest state of the node as it is in the database, we force refresh it by reloading it.
calc = load_node(calc.pk)
if calc.process_state != plumpy.ProcessState.WAITING:
runner.loop.run_until_complete(asyncio.wait_for(waiting_future, timeout=5.0))

# Here we now that the process is with the daemon runner and in the waiting state so we can starting running
# the `verdi process` commands that we want to test
result = run_cli_command(process_pause, ['--wait', str(calc.pk)])
assert calc.paused

if cmd_try_all:
cmd_option = '--all'
else:
cmd_option = str(calc.pk)

result = run_cli_command(process_play, ['--wait', cmd_option])
assert not calc.paused

result = run_cli_command(process_kill, ['--wait', str(calc.pk)])
assert calc.is_terminated
assert calc.is_killed
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,33 @@ def override_logging():
config.unset_option('logging.aiida_loglevel')
config.unset_option('logging.db_loglevel')
configure_logging(with_orm=True)


@pytest.fixture
def with_daemon():
"""Starts the daemon process and then makes sure to kill it once the test is done."""
import sys
import signal
import subprocess

from aiida.engine.daemon.client import DaemonClient
from aiida.cmdline.utils.common import get_env_with_venv_bin

# Add the current python path to the environment that will be used for the daemon sub process.
# This is necessary to guarantee the daemon can also import all the classes that are defined
# in this `tests` module.
env = get_env_with_venv_bin()
env['PYTHONPATH'] = ':'.join(sys.path)

profile = get_config().current_profile
daemon = subprocess.Popen(
DaemonClient(profile).cmd_string.split(),
stderr=sys.stderr,
stdout=sys.stdout,
env=env,
)

yield

# Note this will always be executed after the yield no matter what happened in the test that used this fixture.
os.kill(daemon.pid, signal.SIGTERM)