Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job group execution limit option #4457

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions supervisor/addons/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,10 +791,7 @@ async def stats(self) -> DockerStats:
raise AddonsError() from err

async def write_stdin(self, data) -> None:
"""Write data to add-on stdin.

Return a coroutine.
"""
"""Write data to add-on stdin."""
if not self.with_stdin:
raise AddonsNotSupportedError(
f"Add-on {self.slug} does not support writing to stdin!", _LOGGER.error
Expand Down Expand Up @@ -889,7 +886,10 @@ def _write_tarfile():
await self._backup_command(self.backup_pre)
elif is_running and self.backup_mode == AddonBackupMode.COLD:
_LOGGER.info("Shutdown add-on %s for cold backup", self.slug)
await self.instance.stop()
try:
await self.instance.stop()
except DockerError as err:
raise AddonsError() from err

try:
_LOGGER.info("Building backup for add-on %s", self.slug)
Expand Down
37 changes: 21 additions & 16 deletions supervisor/docker/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@
CoreDNSError,
DBusError,
DockerError,
DockerJobError,
DockerNotFound,
HardwareNotFound,
)
from ..hardware.const import PolicyGroup
from ..hardware.data import Device
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit
from ..jobs.const import JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils import process_lock
from ..utils.sentry import capture_exception
from .const import (
ENV_TIME,
Expand Down Expand Up @@ -73,8 +74,8 @@ class DockerAddon(DockerInterface):

def __init__(self, coresys: CoreSys, addon: Addon):
"""Initialize Docker Home Assistant wrapper."""
super().__init__(coresys)
self.addon: Addon = addon
super().__init__(coresys)

self._hw_listener: EventListener | None = None

Expand Down Expand Up @@ -493,7 +494,8 @@ def mounts(self) -> list[Mount]:

return mounts

async def _run(self) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return
Expand All @@ -503,7 +505,7 @@ async def _run(self) -> None:
_LOGGER.warning("%s running with disabled protected mode!", self.addon.name)

# Cleanup
await self._stop()
await self.stop()

# Don't set a hostname if no separate UTS namespace is used
hostname = None if self.uts_mode else self.addon.hostname
Expand Down Expand Up @@ -563,7 +565,8 @@ async def _run(self) -> None:
BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events
)

async def _update(
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
"""Update a docker image."""
Expand All @@ -574,15 +577,16 @@ async def _update(
)

# Update docker image
await self._install(
await self.install(
version, image=image, latest=latest, need_build=self.addon.latest_need_build
)

# Stop container & cleanup
with suppress(DockerError):
await self._stop()
await self.stop()

async def _install(
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def install(
self,
version: AwesomeVersion,
image: str | None = None,
Expand All @@ -595,7 +599,7 @@ async def _install(
if need_build is None and self.addon.need_build or need_build:
await self._build(version)
else:
await super()._install(version, image, latest, arch)
await super().install(version, image, latest, arch)

async def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container."""
Expand Down Expand Up @@ -632,14 +636,14 @@ async def _build(self, version: AwesomeVersion) -> None:

_LOGGER.info("Build %s:%s done", self.image, version)

@process_lock
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
def export_image(self, tar_file: Path) -> Awaitable[None]:
"""Export current images into a tar file."""
return self.sys_run_in_executor(
self.sys_docker.export_image, self.image, self.version, tar_file
)

@process_lock
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
docker_image = await self.sys_run_in_executor(
Expand All @@ -650,9 +654,9 @@ async def import_image(self, tar_file: Path) -> None:
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)

with suppress(DockerError):
await self._cleanup()
await self.cleanup()

@process_lock
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
if not await self.is_running():
Expand Down Expand Up @@ -682,7 +686,8 @@ def _write_stdin(self, data: bytes) -> None:
_LOGGER.error("Can't write to %s stdin: %s", self.name, err)
raise DockerError() from err

async def _stop(self, remove_container: bool = True) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
# DNS
if self.ip_address != NO_ADDDRESS:
Expand All @@ -697,7 +702,7 @@ async def _stop(self, remove_container: bool = True) -> None:
self.sys_bus.remove_listener(self._hw_listener)
self._hw_listener = None

await super()._stop(remove_container)
await super().stop(remove_container)

async def _validate_trust(
self, image_id: str, image: str, version: AwesomeVersion
Expand Down
8 changes: 6 additions & 2 deletions supervisor/docker/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

from ..const import DOCKER_CPU_RUNTIME_ALLOCATION, MACHINE_ID
from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import (
ENV_TIME,
MOUNT_DBUS,
Expand Down Expand Up @@ -82,13 +85,14 @@ def cpu_rt_runtime(self) -> int | None:
return None
return DOCKER_CPU_RUNTIME_ALLOCATION

async def _run(self) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Cleanup
await self._stop()
await self.stop()

# Create & Run container
docker_container = await self.sys_run_in_executor(
Expand Down
8 changes: 6 additions & 2 deletions supervisor/docker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import logging

from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, ENV_TOKEN
from .interface import DockerInterface

Expand All @@ -23,13 +26,14 @@ def name(self) -> str:
"""Return name of Docker container."""
return CLI_DOCKER_NAME

async def _run(self) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Cleanup
await self._stop()
await self.stop()

# Create & Run container
docker_container = await self.sys_run_in_executor(
Expand Down
8 changes: 6 additions & 2 deletions supervisor/docker/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from docker.types import Mount

from ..coresys import CoreSysAttributes
from ..exceptions import DockerJobError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import ENV_TIME, MOUNT_DBUS, MountType
from .interface import DockerInterface

Expand All @@ -25,13 +28,14 @@ def name(self) -> str:
"""Return name of Docker container."""
return DNS_DOCKER_NAME

async def _run(self) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Cleanup
await self._stop()
await self.stop()

# Create & Run container
docker_container = await self.sys_run_in_executor(
Expand Down
11 changes: 7 additions & 4 deletions supervisor/docker/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from docker.types import Mount

from ..const import LABEL_MACHINE, MACHINE_ID
from ..exceptions import DockerJobError
from ..hardware.const import PolicyGroup
from ..homeassistant.const import LANDINGPAGE
from ..utils import process_lock
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from .const import (
ENV_TIME,
ENV_TOKEN,
Expand Down Expand Up @@ -131,13 +133,14 @@ def mounts(self) -> list[Mount]:

return mounts

async def _run(self) -> None:
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Cleanup
await self._stop()
await self.stop()

# Create & Run container
docker_container = await self.sys_run_in_executor(
Expand Down Expand Up @@ -173,7 +176,7 @@ async def _run(self) -> None:
"Starting Home Assistant %s with version %s", self.image, self.version
)

@process_lock
@Job(limit=JobExecutionLimit.GROUP_ONCE, on_condition=DockerJobError)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
return await self.sys_run_in_executor(
Expand Down
Loading