Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce executor code for docker #4438

Merged
merged 7 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions supervisor/addons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ async def repair(self) -> None:
async def sync_dns(self) -> None:
"""Sync add-ons DNS names."""
# Update hosts
add_host_coros: list[Awaitable[None]] = []
for addon in self.installed:
try:
if not await addon.instance.is_running():
Expand All @@ -448,10 +449,14 @@ async def sync_dns(self) -> None:
)
capture_exception(err)
else:
self.sys_plugins.dns.add_host(
ipv4=addon.ip_address, names=[addon.hostname], write=False
add_host_coros.append(
self.sys_plugins.dns.add_host(
ipv4=addon.ip_address, names=[addon.hostname], write=False
)
)

await asyncio.gather(*add_host_coros)

# Write hosts files
with suppress(CoreDNSError):
self.sys_plugins.dns.write_hosts()
41 changes: 22 additions & 19 deletions supervisor/backups/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def _addon_restore(addon_slug: str) -> Awaitable[None] | None:
async def store_folders(self, folder_list: list[str]):
"""Backup Supervisor data into backup."""

def _folder_save(name: str):
async def _folder_save(name: str):
"""Take backup of a folder."""
slug_name = name.replace("/", "_")
tar_name = Path(
Expand All @@ -434,30 +434,33 @@ def _folder_save(name: str):
_LOGGER.warning("Can't find backup folder %s", name)
return

# Take backup
_LOGGER.info("Backing up folder %s", name)
with SecureTarFile(
tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE
) as tar_file:
atomic_contents_add(
tar_file,
origin_dir,
excludes=[
bound.bind_mount.local_where.as_posix()
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
],
arcname=".",
)

_LOGGER.info("Backup folder %s done", name)
def _save() -> None:
# Take backup
_LOGGER.info("Backing up folder %s", name)
with SecureTarFile(
tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE
) as tar_file:
atomic_contents_add(
tar_file,
origin_dir,
excludes=[
bound.bind_mount.local_where.as_posix()
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
],
arcname=".",
)

_LOGGER.info("Backup folder %s done", name)

await self.sys_run_in_executor(_save)
self._data[ATTR_FOLDERS].append(name)

# Save folder sequential
# avoid issue on slow IO
for folder in folder_list:
try:
await self.sys_run_in_executor(_folder_save, folder)
await _folder_save(folder)
except (tarfile.TarError, OSError) as err:
raise BackupError(
f"Can't backup folder {folder}: {str(err)}", _LOGGER.error
Expand Down
2 changes: 1 addition & 1 deletion supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def setup(self):
self._adjust_system_datetime(),
# Load mounts
self.sys_mounts.load(),
# Start docker monitoring
# Load Docker manager
self.sys_docker.load(),
# Load Plugins container
self.sys_plugins.load(),
Expand Down
10 changes: 7 additions & 3 deletions supervisor/coresys.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
from collections.abc import Callable, Coroutine
from datetime import datetime
from functools import partial
import logging
import os
from types import MappingProxyType
Expand Down Expand Up @@ -520,9 +521,12 @@ def now(self) -> datetime:
return datetime.now(get_time_zone(self.timezone) or UTC)

def run_in_executor(
self, funct: Callable[..., T], *args: Any
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
"""Add an job to the executor pool."""
if kwargs:
funct = partial(funct, **kwargs)

return self.loop.run_in_executor(None, funct, *args)

def create_task(self, coroutine: Coroutine) -> asyncio.Task:
Expand Down Expand Up @@ -700,10 +704,10 @@ def now(self) -> datetime:
return self.coresys.now()

def sys_run_in_executor(
self, funct: Callable[..., T], *args: Any
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
"""Add an job to the executor pool."""
return self.coresys.run_in_executor(funct, *args)
return self.coresys.run_in_executor(funct, *args, **kwargs)

def sys_create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
Expand Down
143 changes: 44 additions & 99 deletions supervisor/docker/addon.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Init file for Supervisor add-on Docker object."""
from __future__ import annotations

import asyncio
from collections.abc import Awaitable
from contextlib import suppress
from ipaddress import IPv4Address, ip_address
Expand Down Expand Up @@ -494,27 +493,25 @@ def mounts(self) -> list[Mount]:

return mounts

def _run(self) -> None:
"""Run Docker image.

Need run inside executor.
"""
if self._is_running():
async def _run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Security check
if not self.addon.protected:
_LOGGER.warning("%s running with disabled protected mode!", self.addon.name)

# Cleanup
self._stop()
await self._stop()

# Don't set a hostname if no separate UTS namespace is used
hostname = None if self.uts_mode else self.addon.hostname

# Create & Run container
try:
docker_container = self.sys_docker.run(
docker_container = await self.sys_run_in_executor(
self.sys_docker.run,
self.image,
tag=str(self.addon.version),
name=self.name,
Expand Down Expand Up @@ -553,7 +550,7 @@ def _run(self) -> None:

# Write data to DNS server
try:
self.sys_plugins.dns.add_host(
await self.sys_plugins.dns.add_host(
ipv4=self.ip_address, names=[self.addon.hostname]
)
except CoreDNSError as err:
Expand All @@ -566,29 +563,26 @@ def _run(self) -> None:
BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events
)

def _update(
async def _update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
"""Update a docker image.

Need run inside executor.
"""
"""Update a docker image."""
image = image or self.image

_LOGGER.info(
"Updating image %s:%s to %s:%s", self.image, self.version, image, version
)

# Update docker image
self._install(
await self._install(
version, image=image, latest=latest, need_build=self.addon.latest_need_build
)

# Stop container & cleanup
with suppress(DockerError):
self._stop()
await self._stop()

def _install(
async def _install(
self,
version: AwesomeVersion,
image: str | None = None,
Expand All @@ -597,29 +591,25 @@ def _install(
*,
need_build: bool | None = None,
) -> None:
"""Pull Docker image or build it.

Need run inside executor.
"""
"""Pull Docker image or build it."""
if need_build is None and self.addon.need_build or need_build:
self._build(version)
await self._build(version)
else:
super()._install(version, image, latest, arch)

def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container.
await super()._install(version, image, latest, arch)

Need run inside executor.
"""
async def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container."""
build_env = AddonBuild(self.coresys, self.addon)
if not build_env.is_valid:
_LOGGER.error("Invalid build environment, can't build this add-on!")
raise DockerError()

_LOGGER.info("Starting build for %s:%s", self.image, version)
try:
image, log = self.sys_docker.images.build(
use_config_proxy=False, **build_env.get_docker_args(version)
image, log = await self.sys_run_in_executor(
self.sys_docker.images.build,
use_config_proxy=False,
**build_env.get_docker_args(version),
)

_LOGGER.debug("Build %s:%s done: %s", self.image, version, log)
Expand All @@ -645,74 +635,36 @@ def _build(self, version: AwesomeVersion) -> None:
@process_lock
def export_image(self, tar_file: Path) -> Awaitable[None]:
"""Export current images into a tar file."""
return self.sys_run_in_executor(self._export_image, tar_file)

def _export_image(self, tar_file: Path) -> None:
"""Export current images into a tar file.

Need run inside executor.
"""
try:
image = self.sys_docker.api.get_image(f"{self.image}:{self.version}")
except (docker.errors.DockerException, requests.RequestException) as err:
_LOGGER.error("Can't fetch image %s: %s", self.image, err)
raise DockerError() from err

_LOGGER.info("Export image %s to %s", self.image, tar_file)
try:
with tar_file.open("wb") as write_tar:
for chunk in image:
write_tar.write(chunk)
except (OSError, requests.RequestException) as err:
_LOGGER.error("Can't write tar file %s: %s", tar_file, err)
raise DockerError() from err

_LOGGER.info("Export image %s done", self.image)
return self.sys_run_in_executor(
self.sys_docker.export_image, self.image, self.version, tar_file
)

@process_lock
def import_image(self, tar_file: Path) -> Awaitable[None]:
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
return self.sys_run_in_executor(self._import_image, tar_file)

def _import_image(self, tar_file: Path) -> None:
"""Import a tar file as image.

Need run inside executor.
"""
try:
with tar_file.open("rb") as read_tar:
docker_image_list = self.sys_docker.images.load(read_tar)

if len(docker_image_list) != 1:
_LOGGER.warning(
"Unexpected image count %d while importing image from tar",
len(docker_image_list),
)
return
docker_image = docker_image_list[0]
except (docker.errors.DockerException, OSError) as err:
_LOGGER.error("Can't import image %s: %s", self.image, err)
raise DockerError() from err

self._meta = docker_image.attrs
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)
docker_image = await self.sys_run_in_executor(
self.sys_docker.import_image, tar_file
)
if docker_image:
self._meta = docker_image.attrs
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)

with suppress(DockerError):
self._cleanup()
with suppress(DockerError):
await self._cleanup()

@process_lock
def write_stdin(self, data: bytes) -> Awaitable[None]:
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
return self.sys_run_in_executor(self._write_stdin, data)
if not await self.is_running():
raise DockerError()

await self.sys_run_in_executor(self._write_stdin, data)

def _write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin.

Need run inside executor.
"""
if not self._is_running():
raise DockerError()

try:
# Load needed docker objects
container = self.sys_docker.containers.get(self.name)
Expand All @@ -730,15 +682,12 @@ def _write_stdin(self, data: bytes) -> None:
_LOGGER.error("Can't write to %s stdin: %s", self.name, err)
raise DockerError() from err

def _stop(self, remove_container=True) -> None:
"""Stop/remove Docker container.

Need run inside executor.
"""
async def _stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
# DNS
if self.ip_address != NO_ADDDRESS:
try:
self.sys_plugins.dns.delete_host(self.addon.hostname)
await self.sys_plugins.dns.delete_host(self.addon.hostname)
except CoreDNSError as err:
_LOGGER.warning("Can't update DNS for %s", self.name)
capture_exception(err)
Expand All @@ -748,21 +697,17 @@ def _stop(self, remove_container=True) -> None:
self.sys_bus.remove_listener(self._hw_listener)
self._hw_listener = None

super()._stop(remove_container)
await super()._stop(remove_container)

def _validate_trust(
async def _validate_trust(
self, image_id: str, image: str, version: AwesomeVersion
) -> None:
"""Validate trust of content."""
if not self.addon.signed:
return

checksum = image_id.partition(":")[2]
job = asyncio.run_coroutine_threadsafe(
self.sys_security.verify_content(self.addon.codenotary, checksum),
self.sys_loop,
)
job.result()
return await self.sys_security.verify_content(self.addon.codenotary, checksum)

@Job(conditions=[JobCondition.OS_AGENT], limit=JobExecutionLimit.SINGLE_WAIT)
async def _hardware_events(self, device: Device) -> None:
Expand Down
Loading