Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous CLI methods in CliApp #533

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,72 @@ For `BaseModel` and `pydantic.dataclasses.dataclass` types, `CliApp.run` will in
* `cli_implicit_flags=True`
* `cli_kebab_case=True`

### Asynchronous CLI Commands

Pydantic settings now supports running asynchronous CLI commands via CliApp.run and CliApp.run_subcommand. With this feature, you can define async def methods within your Pydantic models (including subcommands) and have them executed just like their synchronous counterparts. Specifically:

1. Asynchronous methods are supported: You can now mark your cli_cmd or similar CLI entrypoint methods as async def and have CliApp execute them.
2. Subcommands may also be asynchronous: If you have nested CLI subcommands, the final (lowest-level) subcommand methods can likewise be asynchronous.
3. Limit asynchronous methods to final subcommands: Defining parent commands as asynchronous is not recommended, because it can result in additional threads and event loops being created. For best performance and to avoid unnecessary resource usage, only implement your deepest (child) subcommands as async def.

Below is a simple example demonstrating an asynchronous top-level command:

```py
from pydantic_settings import BaseSettings, CliApp


class AsyncSettings(BaseSettings):
async def cli_cmd(self) -> None:
print('Hello from an async CLI method!')
#> Hello from an async CLI method!


# If an event loop is already running, a new thread will be used;
# otherwise, asyncio.run() is used to execute this async method.
assert CliApp.run(AsyncSettings, cli_args=[]).model_dump() == {}
```

#### Asynchronous Subcommands

As mentioned above, you can also define subcommands as async. However, only do so for the leaf (lowest-level) subcommand to avoid spawning new threads and event loops unnecessarily in parent commands:

```py
from pydantic import BaseModel

from pydantic_settings import (
BaseSettings,
CliApp,
CliPositionalArg,
CliSubCommand,
)


class Clone(BaseModel):
repository: CliPositionalArg[str]
directory: CliPositionalArg[str]

async def cli_cmd(self) -> None:
# Perform async tasks here, e.g. network or I/O operations
print(f'Cloning async from "{self.repository}" into "{self.directory}"')
#> Cloning async from "repo" into "dir"


class Git(BaseSettings):
clone: CliSubCommand[Clone]

def cli_cmd(self) -> None:
# Run the final subcommand (clone/init). It is recommended to define async methods only at the deepest level.
CliApp.run_subcommand(self)


CliApp.run(Git, cli_args=['clone', 'repo', 'dir']).model_dump() == {
'repository': 'repo',
'directory': 'dir',
}
```

When executing a subcommand with an asynchronous cli_cmd, Pydantic settings automatically detects whether the current thread already has an active event loop. If so, the async command is run in a fresh thread to avoid conflicts. Otherwise, it uses asyncio.run() in the current thread. This handling ensures your asynchronous subcommands “just work” without additional manual setup.

### Mutually Exclusive Groups

CLI mutually exclusive groups can be created by inheriting from the `CliMutuallyExclusiveGroup` class.
Expand Down
49 changes: 45 additions & 4 deletions pydantic_settings/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations as _annotations

import asyncio
import inspect
import threading
from argparse import Namespace
from types import SimpleNamespace
from typing import Any, ClassVar, TypeVar
Expand Down Expand Up @@ -446,10 +449,48 @@ class CliApp:

@staticmethod
def _run_cli_cmd(model: Any, cli_cmd_method_name: str, is_required: bool) -> Any:
if hasattr(type(model), cli_cmd_method_name):
getattr(type(model), cli_cmd_method_name)(model)
elif is_required:
raise SettingsError(f'Error: {type(model).__name__} class is missing {cli_cmd_method_name} entrypoint')
command = getattr(type(model), cli_cmd_method_name, None)
if command is None:
if is_required:
raise SettingsError(f'Error: {type(model).__name__} class is missing {cli_cmd_method_name} entrypoint')
return model

# If the method is asynchronous, we handle its execution based on the current event loop status.
if inspect.iscoroutinefunction(command):
# For asynchronous methods, we have two execution scenarios:
# 1. If no event loop is running in the current thread, run the coroutine directly with asyncio.run().
# 2. If an event loop is already running in the current thread, run the coroutine in a separate thread to avoid conflicts.
try:
# Check if an event loop is currently running in this thread.
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop and loop.is_running():
# We're in a context with an active event loop (e.g., Jupyter Notebook).
# Running asyncio.run() here would cause conflicts, so we use a separate thread.
exception_container = []

def run_coro() -> None:
try:
# Execute the coroutine in a new event loop in this separate thread.
asyncio.run(command(model))
except Exception as e:
exception_container.append(e)

thread = threading.Thread(target=run_coro)
thread.start()
thread.join()
if exception_container:
# Propagate exceptions from the separate thread.
raise exception_container[0]
else:
# No event loop is running; safe to run the coroutine directly.
asyncio.run(command(model))
else:
# For synchronous methods, call them directly.
command(model)

return model

@staticmethod
Expand Down
24 changes: 24 additions & 0 deletions tests/test_source_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import asyncio
import re
import sys
import time
Expand Down Expand Up @@ -2120,6 +2121,29 @@ def alt_cmd(self) -> None:
}


def test_cli_app_async_method_no_existing_loop():
class Command(BaseSettings):
called: bool = False

async def cli_cmd(self) -> None:
self.called = True

assert CliApp.run(Command, cli_args=[]).called


def test_cli_app_async_method_with_existing_loop():
class Command(BaseSettings):
called: bool = False

async def cli_cmd(self) -> None:
self.called = True

async def run_as_coro():
return CliApp.run(Command, cli_args=[])

assert asyncio.run(run_as_coro()).called


def test_cli_app_exceptions():
with pytest.raises(
SettingsError, match='Error: NotPydanticModel is not subclass of BaseModel or pydantic.dataclasses.dataclass'
Expand Down
Loading