From 95e252b0a66416309b90991d208b8b2a2bc1e50d Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 15 Jun 2024 14:20:34 +0530 Subject: [PATCH] Added ReplContext class (#118) * implemented ReplContext class * updated dependencies * Added test cases for ReplContext * Moved ISATTY value to 'globals_' for global access * Fixed flake8 errors --- click_repl/__init__.py | 1 + click_repl/_completer.py | 14 +-- click_repl/_ctx_stack.py | 28 +++++ click_repl/_repl.py | 98 ++++++++++-------- click_repl/core.py | 214 +++++++++++++++++++++++++++++++++++++++ click_repl/globals_.py | 46 +++++++++ setup.cfg | 1 + tests/test_repl_ctx.py | 50 +++++++++ 8 files changed, 400 insertions(+), 52 deletions(-) create mode 100644 click_repl/_ctx_stack.py create mode 100644 click_repl/core.py create mode 100644 click_repl/globals_.py create mode 100644 tests/test_repl_ctx.py diff --git a/click_repl/__init__.py b/click_repl/__init__.py index df3cea8..34f20da 100644 --- a/click_repl/__init__.py +++ b/click_repl/__init__.py @@ -1,4 +1,5 @@ from ._completer import ClickCompleter as ClickCompleter # noqa: F401 +from .core import pass_context as pass_context # noqa: F401 from ._repl import register_repl as register_repl # noqa: F401 from ._repl import repl as repl # noqa: F401 from .exceptions import CommandLineParserError as CommandLineParserError # noqa: F401 diff --git a/click_repl/_completer.py b/click_repl/_completer.py index 67b8bda..1f64fa0 100644 --- a/click_repl/_completer.py +++ b/click_repl/_completer.py @@ -206,16 +206,16 @@ def _get_completion_for_cmd_args( current_args = args[param.nargs * -1 :] # Show only unused opts - already_present = any([ - opt in previous_args for opt in opts - ]) + already_present = any([opt in previous_args for opt in opts]) hide = self.show_only_unused and already_present and not param.multiple # Show only shortest opt - if (self.shortest_only - and not incomplete # just typed a space - # not selecting a value for a longer version of this option - and args[-1] not in opts): + if ( + self.shortest_only + and not incomplete # just typed a space + # not selecting a value for a longer version of this option + and args[-1] not in opts + ): opts = [min(opts, key=len)] for option in opts: diff --git a/click_repl/_ctx_stack.py b/click_repl/_ctx_stack.py new file mode 100644 index 0000000..1046ccd --- /dev/null +++ b/click_repl/_ctx_stack.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .core import ReplContext + + +# To store the ReplContext objects generated throughout the Runtime. +_context_stack: list[ReplContext] = [] + + +def _push_context(ctx: ReplContext) -> None: + """ + Pushes a new REPL context onto the current stack. + + Parameters + ---------- + ctx + The :class:`~click_repl.core.ReplContext` object that should be + added to the REPL context stack. + """ + _context_stack.append(ctx) + + +def _pop_context() -> None: + """Removes the top-level REPL context from the stack.""" + _context_stack.pop() diff --git a/click_repl/_repl.py b/click_repl/_repl.py index 5693f52..0445182 100644 --- a/click_repl/_repl.py +++ b/click_repl/_repl.py @@ -2,13 +2,14 @@ import click import sys -from prompt_toolkit import PromptSession from prompt_toolkit.history import InMemoryHistory from ._completer import ClickCompleter from .exceptions import ClickExit # type: ignore[attr-defined] from .exceptions import CommandLineParserError, ExitReplException, InvalidGroupFormat from .utils import _execute_internal_and_sys_cmds +from .core import ReplContext +from .globals_ import ISATTY, get_current_repl_ctx __all__ = ["bootstrap_prompt", "register_repl", "repl"] @@ -73,8 +74,6 @@ def repl( f"an optional argument '{param.name}' in REPL mode" ) - isatty = sys.stdin.isatty() - # Delete the REPL command from those available, as we don't want to allow # nesting REPLs (note: pass `None` to `pop` as we don't want to error if # REPL command already not present for some reason). @@ -90,58 +89,67 @@ def repl( original_command = available_commands.pop(repl_command_name, None) - if isatty: - prompt_kwargs = bootstrap_prompt(group, prompt_kwargs, group_ctx) - session = PromptSession(**prompt_kwargs) + repl_ctx = ReplContext( + group_ctx, + bootstrap_prompt(group, prompt_kwargs, group_ctx), + get_current_repl_ctx(silent=True), + ) - def get_command(): - return session.prompt() + if ISATTY: + # If stdin is a TTY, prompt the user for input using PromptSession. + def get_command() -> str: + return repl_ctx.session.prompt() # type: ignore else: - get_command = sys.stdin.readline - - while True: - try: - command = get_command() - except KeyboardInterrupt: - continue - except EOFError: - break - - if not command: - if isatty: + # If stdin is not a TTY, read input from stdin directly. + def get_command() -> str: + inp = sys.stdin.readline().strip() + repl_ctx._history.append(inp) + return inp + + with repl_ctx: + while True: + try: + command = get_command() + except KeyboardInterrupt: continue - else: + except EOFError: break - try: - args = _execute_internal_and_sys_cmds( - command, allow_internal_commands, allow_system_commands - ) - if args is None: - continue + if not command: + if ISATTY: + continue + else: + break - except CommandLineParserError: - continue + try: + args = _execute_internal_and_sys_cmds( + command, allow_internal_commands, allow_system_commands + ) + if args is None: + continue - except ExitReplException: - break + except CommandLineParserError: + continue + + except ExitReplException: + break - try: - # The group command will dispatch based on args. - old_protected_args = group_ctx.protected_args try: - group_ctx.protected_args = args - group.invoke(group_ctx) - finally: - group_ctx.protected_args = old_protected_args - except click.ClickException as e: - e.show() - except (ClickExit, SystemExit): - pass - - except ExitReplException: - break + # The group command will dispatch based on args. + old_protected_args = group_ctx.protected_args + try: + group_ctx.protected_args = args + group.invoke(group_ctx) + finally: + group_ctx.protected_args = old_protected_args + except click.ClickException as e: + e.show() + except (ClickExit, SystemExit): + pass + + except ExitReplException: + break if original_command is not None: available_commands[repl_command_name] = original_command diff --git a/click_repl/core.py b/click_repl/core.py new file mode 100644 index 0000000..79d30fa --- /dev/null +++ b/click_repl/core.py @@ -0,0 +1,214 @@ +""" +Core functionalities for managing context of the click_repl app. +""" + +from __future__ import annotations + +from functools import wraps +from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, TypeVar + +from click import Context +from prompt_toolkit import PromptSession +from typing_extensions import Concatenate, Final, ParamSpec, TypeAlias, TypedDict + +from ._ctx_stack import _pop_context, _push_context +from .globals_ import ISATTY, get_current_repl_ctx + +if TYPE_CHECKING: + from prompt_toolkit.formatted_text import AnyFormattedText + + +P = ParamSpec("P") +R = TypeVar("R") +F = TypeVar("F", bound=Callable[..., Any]) + + +__all__ = ["ReplContext", "pass_context"] + + +_PromptSession: TypeAlias = PromptSession[Dict[str, Any]] + + +class ReplContextInfoDict(TypedDict): + group_ctx: Context + prompt_kwargs: dict[str, Any] + session: _PromptSession | None + parent: ReplContext | None + _history: list[str] + + +class ReplContext: + """ + Context object for the REPL sessions. + + This class tracks the depth of nested REPLs, ensuring seamless navigation + between different levels. It facilitates nested REPL scenarios, allowing + multiple levels of interactive REPL sessions. + + Each REPL's properties are stored inside this context class, allowing them to + be accessed and shared with their parent REPL. + + All the settings for each REPL session persist until the session is terminated. + + Parameters + ---------- + group_ctx + The click context object that belong to the CLI/parent Group. + + prompt_kwargs + Extra keyword arguments for + :class:`~prompt_toolkit.shortcuts.PromptSession` class. + + parent + REPL Context object of the parent REPL session, if exists. Otherwise, :obj:`None`. + """ + + __slots__ = ( + "group_ctx", + "prompt_kwargs", + "parent", + "session", + "_history", + ) + + def __init__( + self, + group_ctx: Context, + prompt_kwargs: dict[str, Any] = {}, + parent: ReplContext | None = None, + ) -> None: + """ + Initializes the `ReplContext` class. + """ + session: _PromptSession | None + + if ISATTY: + session = PromptSession(**prompt_kwargs) + + else: + session = None + + self.group_ctx: Final[Context] = group_ctx + """The click context object that belong to the CLI/parent Group.""" + + self.session = session + """Object that's responsible for managing and executing the REPL.""" + + self._history: list[str] = [] + """ + History of past executed commands. + + Used only when :func:`~sys.stdin.isatty` is :obj:`False`. + """ + + self.prompt_kwargs = prompt_kwargs + """ + Extra keyword arguments for + :class:`~prompt_toolkit.shortcuts.PromptSession` class. + """ + + self.parent: Final[ReplContext | None] = parent + """ + REPL Context object of the parent REPL session, if exists. + Otherwise, :obj:`None`. + """ + + def __enter__(self) -> ReplContext: + _push_context(self) + return self + + def __exit__(self, *_: Any) -> None: + _pop_context() + + @property + def prompt(self) -> AnyFormattedText: + """ + The prompt text of the REPL. + + Returns + ------- + prompt_toolkit.formatted_text.AnyFormattedText + The prompt object if :func:`~sys.stdin.isatty` is :obj:`True`, + else :obj:`None`. + """ + if ISATTY and self.session is not None: + return self.session.message + return None + + @prompt.setter + def prompt(self, value: AnyFormattedText) -> None: + if ISATTY and self.session is not None: + self.session.message = value + + def to_info_dict(self) -> ReplContextInfoDict: + """ + Provides a dictionary with minimal info about the current REPL. + + Returns + ------- + ReplContextInfoDict + A dictionary that has the instance variables and their values. + """ + + res: ReplContextInfoDict = { + "group_ctx": self.group_ctx, + "prompt_kwargs": self.prompt_kwargs, + "session": self.session, + "parent": self.parent, + "_history": self._history, + } + + return res + + def session_reset(self) -> None: + """ + Resets values of :class:`~prompt_toolkit.session.PromptSession` to + the provided :attr:`~.prompt_kwargs`, discarding any changes done to the + :class:`~prompt_toolkit.session.PromptSession` object. + """ + + if ISATTY and self.session is not None: + self.session = PromptSession(**self.prompt_kwargs) + + def history(self) -> Generator[str, None, None]: + """ + Generates the history of past executed commands. + + Yields + ------ + str + The executed command string from the history, + in chronological order from most recent to oldest. + """ + + if ISATTY and self.session is not None: + yield from self.session.history.load_history_strings() + + else: + yield from reversed(self._history) + + +def pass_context( + func: Callable[Concatenate[ReplContext | None, P], R], +) -> Callable[P, R]: + """ + Decorator that marks a callback function to receive the current + REPL context object as its first argument. + + Parameters + ---------- + func + The callback function to pass context as its first parameter. + + Returns + ------- + Callable[P,R] + The decorated callback function that receives the current REPL + context object as its first argument. + """ + + @wraps(func) + def decorator(*args: P.args, **kwargs: P.kwargs) -> R: + return func(get_current_repl_ctx(), *args, **kwargs) + + return decorator diff --git a/click_repl/globals_.py b/click_repl/globals_.py new file mode 100644 index 0000000..6a73652 --- /dev/null +++ b/click_repl/globals_.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import sys +from typing import TYPE_CHECKING, NoReturn + +from ._ctx_stack import _context_stack + +if TYPE_CHECKING: + from .core import ReplContext + + +ISATTY = sys.stdin.isatty() + + +def get_current_repl_ctx(silent: bool = False) -> ReplContext | NoReturn | None: + """ + Retrieves the current click-repl context. + + This function provides a way to access the context from anywhere + in the code. This function serves as a more implicit alternative to the + :func:`~click.core.pass_context` decorator. + + Parameters + ---------- + silent + If set to :obj:`True`, the function returns :obj:`None` if no context + is available. The default behavior is to raise a :exc:`~RuntimeError`. + + Returns + ------- + :class:`~click_repl.core.ReplContext` | None + REPL context object if available, or :obj:`None` if ``silent`` is :obj:`True`. + + Raises + ------ + RuntimeError + If there's no context object in the stack and ``silent`` is :obj:`False`. + """ + + try: + return _context_stack[-1] + except IndexError: + if not silent: + raise RuntimeError("There is no active click-repl context.") + + return None diff --git a/setup.cfg b/setup.cfg index 7cba7f5..b8b3a2d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ packages= install_requires = click>=7.0 prompt_toolkit>=3.0.36 + typing-extensions>=4.7.0 python_requires = >=3.6 zip_safe = no diff --git a/tests/test_repl_ctx.py b/tests/test_repl_ctx.py new file mode 100644 index 0000000..f4fb238 --- /dev/null +++ b/tests/test_repl_ctx.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import click +import pytest + +import click_repl +from tests import mock_stdin + + +@click.group(invoke_without_command=True) +@click.pass_context +def cli(ctx): + if ctx.invoked_subcommand is None: + click_repl.repl(ctx) + + +@cli.command() +def hello(): + print("Hello!") + + +@cli.command() +@click_repl.pass_context +def history_test(repl_ctx): + print(list(repl_ctx.history())) + + +def test_repl_ctx_history(capsys): + with mock_stdin("hello\nhistory-test\n"): + with pytest.raises(SystemExit): + cli(args=[], prog_name="test_repl_ctx_history") + + assert ( + capsys.readouterr().out.replace("\r\n", "\n") + == "Hello!\n['history-test', 'hello']\n" + ) + + +@cli.command() +@click_repl.pass_context +def prompt_test(repl_ctx): + print(repl_ctx.prompt) + + +def test_repl_ctx_prompt(capsys): + with mock_stdin("prompt-test\n"): + with pytest.raises(SystemExit): + cli(args=[], prog_name="test_repl_ctx_history") + + assert capsys.readouterr().out.replace("\r\n", "\n") == "None\n"