Skip to content

Commit

Permalink
Expose more Sqlalchemy settings, part 1 (#16742)
Browse files Browse the repository at this point in the history
Co-authored-by: zzstoatzz <thrast36@gmail.com>
  • Loading branch information
cicdw and zzstoatzz authored Jan 17, 2025
1 parent 97708c2 commit ea87722
Show file tree
Hide file tree
Showing 35 changed files with 619 additions and 263 deletions.
89 changes: 62 additions & 27 deletions docs/v3/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ The log level of the runner's webserver.

**Type**: `string`

**Default**: `error`
**Default**: `ERROR`

**Constraints**:
- Allowed values: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
Expand Down Expand Up @@ -871,6 +871,58 @@ Number of seconds a runner should wait between heartbeats for flow runs.

**TOML dotted key path**: `runner.server`

---
## SQLAlchemySettings
Settings for controlling SQLAlchemy behavior; note that these settings only take effect when
using a PostgreSQL database.
### `pool_size`
Controls connection pool size of database connection pools from the Prefect backend.

**Type**: `integer`

**Default**: `5`

**TOML dotted key path**: `server.database.sqlalchemy.pool_size`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE`, `PREFECT_SQLALCHEMY_POOL_SIZE`

### `pool_recycle`
This setting causes the pool to recycle connections after the given number of seconds has passed; set it to -1 to avoid recycling entirely.

**Type**: `integer`

**Default**: `3600`

**TOML dotted key path**: `server.database.sqlalchemy.pool_recycle`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_RECYCLE`

### `pool_timeout`
Number of seconds to wait before giving up on getting a connection from the pool. Defaults to 30 seconds.

**Type**: `number | None`

**Default**: `30.0`

**TOML dotted key path**: `server.database.sqlalchemy.pool_timeout`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_TIMEOUT`

### `max_overflow`
Controls maximum overflow of the connection pool. To prevent overflow, set to -1.

**Type**: `integer`

**Default**: `10`

**TOML dotted key path**: `server.database.sqlalchemy.max_overflow`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_MAX_OVERFLOW`, `PREFECT_SQLALCHEMY_MAX_OVERFLOW`

---
## ServerAPISettings
Settings for controlling API server behavior
Expand Down Expand Up @@ -1039,6 +1091,13 @@ The default limit applied to queries that can return multiple objects, such as `
---
## ServerDatabaseSettings
Settings for controlling server database behavior
### `sqlalchemy`
Settings for controlling SQLAlchemy behavior

**Type**: [SQLAlchemySettings](#sqlalchemysettings)

**TOML dotted key path**: `server.database.sqlalchemy`

### `connection_url`

A database connection URL in a SQLAlchemy-compatible
Expand Down Expand Up @@ -1159,7 +1218,7 @@ If `True`, the database will be migrated on application startup.
`PREFECT_SERVER_DATABASE_MIGRATE_ON_START`, `PREFECT_API_DATABASE_MIGRATE_ON_START`

### `timeout`
A statement timeout, in seconds, applied to all database interactions made by the API. Defaults to 10 seconds.
A statement timeout, in seconds, applied to all database interactions made by the Prefect backend. Defaults to 10 seconds.

**Type**: `number | None`

Expand All @@ -1182,20 +1241,8 @@ A connection timeout, in seconds, applied to database connections. Defaults to `
**Supported environment variables**:
`PREFECT_SERVER_DATABASE_CONNECTION_TIMEOUT`, `PREFECT_API_DATABASE_CONNECTION_TIMEOUT`

### `sqlalchemy_pool_size`
Controls connection pool size of database connection pools from the Prefect API. If not set, the default SQLAlchemy pool size will be used.

**Type**: `integer | None`

**Default**: `None`

**TOML dotted key path**: `server.database.sqlalchemy_pool_size`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE`, `PREFECT_SQLALCHEMY_POOL_SIZE`

### `connection_app_name`
Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect API.
Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect backend.

**Type**: `string | None`

Expand All @@ -1206,18 +1253,6 @@ Controls the application_name field for connections opened from the connection p
**Supported environment variables**:
`PREFECT_SERVER_DATABASE_CONNECTION_APP_NAME`

### `sqlalchemy_max_overflow`
Controls maximum overflow of the connection pool when using a PostgreSQL database with the Prefect API. If not set, the default SQLAlchemy maximum overflow value will be used.

**Type**: `integer | None`

**Default**: `None`

**TOML dotted key path**: `server.database.sqlalchemy_max_overflow`

**Supported environment variables**:
`PREFECT_SERVER_DATABASE_SQLALCHEMY_MAX_OVERFLOW`, `PREFECT_SQLALCHEMY_MAX_OVERFLOW`

---
## ServerDeploymentsSettings
### `concurrency_slot_wait_seconds`
Expand Down
97 changes: 60 additions & 37 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@
"type": "integer"
},
"log_level": {
"default": "error",
"default": "ERROR",
"description": "The log level of the runner's webserver.",
"enum": [
"DEBUG",
Expand Down Expand Up @@ -739,6 +739,58 @@
"title": "RunnerSettings",
"type": "object"
},
"SQLAlchemySettings": {
"description": "Settings for controlling SQLAlchemy behavior; note that these settings only take effect when\nusing a PostgreSQL database.",
"properties": {
"pool_size": {
"default": 5,
"description": "Controls connection pool size of database connection pools from the Prefect backend.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE",
"PREFECT_SQLALCHEMY_POOL_SIZE"
],
"title": "Pool Size",
"type": "integer"
},
"pool_recycle": {
"default": 3600,
"description": "This setting causes the pool to recycle connections after the given number of seconds has passed; set it to -1 to avoid recycling entirely.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_RECYCLE"
],
"title": "Pool Recycle",
"type": "integer"
},
"pool_timeout": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": 30.0,
"description": "Number of seconds to wait before giving up on getting a connection from the pool. Defaults to 30 seconds.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_TIMEOUT"
],
"title": "Pool Timeout"
},
"max_overflow": {
"default": 10,
"description": "Controls maximum overflow of the connection pool. To prevent overflow, set to -1.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_MAX_OVERFLOW",
"PREFECT_SQLALCHEMY_MAX_OVERFLOW"
],
"title": "Max Overflow",
"type": "integer"
}
},
"title": "SQLAlchemySettings",
"type": "object"
},
"ServerAPISettings": {
"description": "Settings for controlling API server behavior",
"properties": {
Expand Down Expand Up @@ -855,6 +907,11 @@
"ServerDatabaseSettings": {
"description": "Settings for controlling server database behavior",
"properties": {
"sqlalchemy": {
"$ref": "#/$defs/SQLAlchemySettings",
"description": "Settings for controlling SQLAlchemy behavior",
"supported_environment_variables": []
},
"connection_url": {
"anyOf": [
{
Expand Down Expand Up @@ -1012,7 +1069,7 @@
}
],
"default": 10.0,
"description": "A statement timeout, in seconds, applied to all database interactions made by the API. Defaults to 10 seconds.",
"description": "A statement timeout, in seconds, applied to all database interactions made by the Prefect backend. Defaults to 10 seconds.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_TIMEOUT",
"PREFECT_API_DATABASE_TIMEOUT"
Expand All @@ -1036,23 +1093,6 @@
],
"title": "Connection Timeout"
},
"sqlalchemy_pool_size": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Controls connection pool size of database connection pools from the Prefect API. If not set, the default SQLAlchemy pool size will be used.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE",
"PREFECT_SQLALCHEMY_POOL_SIZE"
],
"title": "Sqlalchemy Pool Size"
},
"connection_app_name": {
"anyOf": [
{
Expand All @@ -1063,28 +1103,11 @@
}
],
"default": null,
"description": "Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect API.",
"description": "Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect backend.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_CONNECTION_APP_NAME"
],
"title": "Connection App Name"
},
"sqlalchemy_max_overflow": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Controls maximum overflow of the connection pool when using a PostgreSQL database with the Prefect API. If not set, the default SQLAlchemy maximum overflow value will be used.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_MAX_OVERFLOW",
"PREFECT_SQLALCHEMY_MAX_OVERFLOW"
],
"title": "Sqlalchemy Max Overflow"
}
},
"title": "ServerDatabaseSettings",
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ filterwarnings =
ignore::pluggy.PluggyTeardownRaisedWarning



[mypy]
plugins=
pydantic.mypy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from pydantic import AliasChoices, AliasPath, Field

from prefect.settings.base import PrefectBaseSettings, _build_settings_config
from prefect.settings.base import PrefectBaseSettings, build_settings_config


class KubernetesWorkerSettings(PrefectBaseSettings):
model_config = _build_settings_config(("integrations", "kubernetes", "worker"))
model_config = build_settings_config(("integrations", "kubernetes", "worker"))

api_key_secret_name: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -35,7 +35,7 @@ class KubernetesWorkerSettings(PrefectBaseSettings):


class KubernetesSettings(PrefectBaseSettings):
model_config = _build_settings_config(("integrations", "kubernetes"))
model_config = build_settings_config(("integrations", "kubernetes"))

cluster_uid: Optional[str] = Field(
default=None,
Expand Down
4 changes: 2 additions & 2 deletions src/integrations/prefect-redis/prefect_redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

from prefect.settings.base import (
PrefectBaseSettings,
_build_settings_config, # type: ignore[reportPrivateUsage]
build_settings_config, # type: ignore[reportPrivateUsage]
)


class RedisMessagingSettings(PrefectBaseSettings):
model_config = _build_settings_config(
model_config = build_settings_config(
(
"redis",
"messaging",
Expand Down
20 changes: 9 additions & 11 deletions src/prefect/server/database/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
PREFECT_API_DATABASE_TIMEOUT,
PREFECT_SERVER_DATABASE_CONNECTION_APP_NAME,
PREFECT_SQLALCHEMY_MAX_OVERFLOW,
PREFECT_SQLALCHEMY_POOL_SIZE,
PREFECT_TESTING_UNIT_TEST_MODE,
get_current_settings,
)
from prefect.utilities.asyncutils import add_event_loop_shutdown_callback

Expand Down Expand Up @@ -131,7 +131,8 @@ def __init__(
connection_timeout or PREFECT_API_DATABASE_CONNECTION_TIMEOUT.value()
)
self.sqlalchemy_pool_size: Optional[int] = (
sqlalchemy_pool_size or PREFECT_SQLALCHEMY_POOL_SIZE.value()
sqlalchemy_pool_size
or get_current_settings().server.database.sqlalchemy.pool_size
)
self.sqlalchemy_max_overflow: Optional[int] = (
sqlalchemy_max_overflow or PREFECT_SQLALCHEMY_MAX_OVERFLOW.value()
Expand Down Expand Up @@ -205,8 +206,11 @@ async def engine(self) -> AsyncEngine:
self.timeout,
)
if cache_key not in ENGINES:
# apply database timeout
kwargs: dict[str, Any] = dict()
kwargs: dict[
str, Any
] = get_current_settings().server.database.sqlalchemy.model_dump(
mode="json"
)
connect_args: dict[str, Any] = dict()

if self.timeout is not None:
Expand Down Expand Up @@ -337,7 +341,7 @@ async def engine(self) -> AsyncEngine:
f"{sqlite3.sqlite_version}"
)

kwargs: dict[str, Any] = {}
kwargs: dict[str, Any] = dict()

loop = get_running_loop()

Expand All @@ -347,12 +351,6 @@ async def engine(self) -> AsyncEngine:
if self.timeout is not None:
kwargs["connect_args"] = dict(timeout=self.timeout)

if self.sqlalchemy_pool_size is not None:
kwargs["pool_size"] = self.sqlalchemy_pool_size

if self.sqlalchemy_max_overflow is not None:
kwargs["max_overflow"] = self.sqlalchemy_max_overflow

# use `named` paramstyle for sqlite instead of `qmark` in very rare
# circumstances, we've seen aiosqlite pass parameters in the wrong
# order; by using named parameters we avoid this issue
Expand Down
5 changes: 4 additions & 1 deletion src/prefect/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def _add_environment_variables(
env_vars.append(f"{model.model_config.get('env_prefix')}{property.upper()}")


def _build_settings_config( # pyright: ignore[reportUnusedFunction] This is used elsewhere. TODO: update to be a public function because it is used in integration libraries.
def build_settings_config(
path: Tuple[str, ...] = tuple(), frozen: bool = False
) -> PrefectSettingsConfigDict:
env_prefix = f"PREFECT_{'_'.join(path).upper()}_" if path else "PREFECT_"
Expand All @@ -212,6 +212,9 @@ def _build_settings_config( # pyright: ignore[reportUnusedFunction] This is use
)


_build_settings_config = build_settings_config # noqa # TODO: remove once all usage updated


def _to_environment_variable_value(
value: list[object] | set[object] | tuple[object] | Any,
) -> str:
Expand Down
Loading

0 comments on commit ea87722

Please sign in to comment.