Skip to content

Commit

Permalink
Adds a base sigint_handler() method to the CommandSet.
Browse files Browse the repository at this point in the history
The sigint_handler() in Cmd will now delegate to the CommandSet.

Addresses #1197
  • Loading branch information
anselor committed Sep 27, 2023
1 parent 9886b82 commit 27f3666
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,9 @@ def __init__(
self.suggest_similar_command = suggest_similar_command
self.default_suggestion_message = "Did you mean {}?"

# the current command being executed
self.current_command: Optional[Statement] = None

def find_commandsets(self, commandset_type: Type[CommandSet], *, subclass_match: bool = False) -> List[CommandSet]:
"""
Find all CommandSets that match the provided CommandSet type.
Expand Down Expand Up @@ -2363,7 +2366,13 @@ def sigint_handler(self, signum: int, _: FrameType) -> None:

# Check if we are allowed to re-raise the KeyboardInterrupt
if not self.sigint_protection:
self._raise_keyboard_interrupt()
raise_interrupt = True
if self.current_command is not None:
command_set = self.find_commandset_for_command(self.current_command.command)
if command_set is not None:
raise_interrupt = not command_set.sigint_handler()
if raise_interrupt:
self._raise_keyboard_interrupt()

def _raise_keyboard_interrupt(self) -> None:
"""Helper function to raise a KeyboardInterrupt"""
Expand Down Expand Up @@ -2953,7 +2962,11 @@ def onecmd(self, statement: Union[Statement, str], *, add_to_history: bool = Tru
):
self.history.append(statement)

stop = func(statement)
try:
self.current_command = statement
stop = func(statement)
finally:
self.current_command = None

else:
stop = self.default(statement)
Expand Down
9 changes: 9 additions & 0 deletions cmd2/command_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,12 @@ def remove_settable(self, name: str) -> None:
del self._settables[name]
except KeyError:
raise KeyError(name + " is not a settable parameter")

def sigint_handler(self) -> bool:
"""
Handle a SIGINT that occurred for a command in this CommandSet.
:return: True if this completes the interrupt handling and no KeyboardInterrupt will be raised.
False to raise a KeyboardInterrupt.
"""
return False
36 changes: 36 additions & 0 deletions tests_isolated/test_commandset/test_commandset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import argparse
import signal
from typing import (
List,
)
Expand Down Expand Up @@ -197,6 +198,9 @@ def test_load_commands(command_sets_manual, capsys):
assert command_sets_manual.find_commandsets(CommandSetA)[0] is cmd_set
assert command_sets_manual.find_commandset_for_command('elderberry') is cmd_set

out = command_sets_manual.app_cmd('apple')
assert 'Apple!' in out.stdout

# Make sure registration callbacks ran
out, err = capsys.readouterr()
assert "in on_register now" in out
Expand Down Expand Up @@ -573,6 +577,38 @@ def test_subcommands(command_sets_manual):
command_sets_manual.unregister_command_set(base_cmds)


def test_commandset_sigint(command_sets_manual):
# shows that the command is able to continue execution if the sigint_handler
# returns True that we've handled interrupting the command.
class SigintHandledCommandSet(cmd2.CommandSet):
def do_foo(self, _):
self._cmd.poutput('in foo')
self._cmd.sigint_handler(signal.SIGINT, None)
self._cmd.poutput('end of foo')

def sigint_handler(self) -> bool:
return True

cs1 = SigintHandledCommandSet()
command_sets_manual.register_command_set(cs1)
out = command_sets_manual.app_cmd('foo')
assert 'in foo' in out.stdout
assert 'end of foo' in out.stdout

# shows that the command is interrupted if we don't report we've handled the sigint
class SigintUnhandledCommandSet(cmd2.CommandSet):
def do_bar(self, _):
self._cmd.poutput('in do bar')
self._cmd.sigint_handler(signal.SIGINT, None)
self._cmd.poutput('end of do bar')

cs2 = SigintUnhandledCommandSet()
command_sets_manual.register_command_set(cs2)
out = command_sets_manual.app_cmd('bar')
assert 'in do bar' in out.stdout
assert 'end of do bar' not in out.stdout


def test_nested_subcommands(command_sets_manual):
base_cmds = LoadableBase(1)
pasta_cmds = LoadablePastaStir(1)
Expand Down

0 comments on commit 27f3666

Please sign in to comment.