Skip to content

Commit

Permalink
Have Transmitter get values from RunnerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrews committed Oct 17, 2024
1 parent 1121854 commit 15fe5e5
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 67 deletions.
34 changes: 20 additions & 14 deletions src/ansible_runner/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseExecutionMode(Enum):

# Metadata string values
class MetaValues(Enum):
STREAMABLE = 'streamable'
TRANSMIT = 'transmit'


@dataclass
Expand All @@ -82,38 +82,38 @@ class BaseConfig:
# No other config objects make use of positional parameters, so this should be fine.
#
# Example use case: RunnerConfig("/tmp/demo", playbook="main.yml", ...)
private_data_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
private_data_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

artifact_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifact_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
check_job_event_data: bool = False
container_auth_data: dict[str, str] | None = None
container_image: str = ""
container_image: str | None = None
container_options: list[str] | None = None
container_volume_mounts: list[str] | None = None
container_workdir: str | None = None
envvars: dict[str, Any] | None = None
fact_cache: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
fact_cache: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
fact_cache_type: str = 'jsonfile'
host_cwd: str | None = None
ident: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
ident: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
json_mode: bool = False
keepalive_seconds: int | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
keepalive_seconds: int | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
passwords: dict[str, str] | None = None
process_isolation: bool = False
process_isolation_executable: str = defaults.default_process_isolation_executable
project_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
project_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
quiet: bool = False
rotate_artifacts: int = 0
settings: dict | None = None
ssh_key: str | None = None
suppress_env_files: bool = False
timeout: int | None = None

event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

_CONTAINER_ENGINES = ('docker', 'podman')

Expand All @@ -123,6 +123,8 @@ def __post_init__(self) -> None:
self.command: list[str] = []
self.registry_auth_path: str
self.container_name: str = "" # like other properties, not accurate until prepare is called
if self.container_image is None:
self.container_image = ''

# ignore this for now since it's worker-specific and would just trip up old runners
# self.keepalive_seconds = keepalive_seconds
Expand All @@ -147,7 +149,9 @@ def __post_init__(self) -> None:

if self.ident is None:
self.ident = str(uuid4())
self.ident_set_by_user = False
else:
self.ident_set_by_user = True
self.ident = str(self.ident)

self.artifact_dir = os.path.join(self.artifact_dir, self.ident)
Expand Down Expand Up @@ -185,7 +189,6 @@ def prepare_env(self, runner_mode: str = 'pexpect') -> None:
Manages reading environment metadata files under ``private_data_dir`` and merging/updating
with existing values so the :py:class:`ansible_runner.runner.Runner` object can read and use them easily
"""

if self.ident is None:
raise ConfigurationError("ident value cannot be None")
if self.artifact_dir is None:
Expand Down Expand Up @@ -520,6 +523,9 @@ def wrap_args_for_containerization(self,
if self.private_data_dir is None:
raise ConfigurationError("private_data_dir value cannot be None")

if self.container_image is None:
raise ConfigurationError("container_image value cannot be None")

new_args = [self.process_isolation_executable]
new_args.extend(['run', '--rm'])

Expand Down
32 changes: 28 additions & 4 deletions src/ansible_runner/config/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,42 @@ def extra_vars(self, value):
def streamable_attributes(self) -> dict[str, Any]:
"""Get the set of streamable attributes that have a value that is different from the default.
The field metadata indicates if the attribute is streamable. By default, an attribute
The field metadata indicates if the attribute is streamable from Transmit. By default, an attribute
is considered streamable (must be explicitly disabled).
:return: A dict of attribute names and their values.
"""
retval = {}
for field_obj in fields(self):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.STREAMABLE, True):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.TRANSMIT, True):
continue
current_value = getattr(self, field_obj.name)
if not field_obj.default == current_value:
retval[field_obj.name] = current_value

if field_obj.default == current_value:
continue

# Treat an empty current value (e.g., {} or "") as the same as a default of None to prevent
# streaming unnecessary empty values.
if field_obj.default is None and current_value in ({}, "", []):
continue

retval[field_obj.name] = current_value

return retval

def all_non_default_attributes(self) -> dict[str, Any]:
"""Get all values that have been set differently from their default values.
:return: A dict of attribute names and their values.
"""
retval = {}
for field_obj in fields(self):
current_value = getattr(self, field_obj.name)
if field_obj.default == current_value:
continue
if field_obj.default is None and current_value in ({}, "", []):
continue
retval[field_obj.name] = current_value
return retval

def prepare(self):
Expand Down
9 changes: 3 additions & 6 deletions src/ansible_runner/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import sys
import threading
import logging
from dataclasses import asdict

from ansible_runner import output
from ansible_runner._internal._dump_artifacts import dump_artifacts
Expand Down Expand Up @@ -90,18 +89,16 @@ def init_runner(
config.cancel_callback = signal_handler()

if streamer == 'transmit':
kwargs = asdict(config)
stream_transmitter = Transmitter(only_transmit_kwargs, _output=_output, **kwargs)
stream_transmitter = Transmitter(config, only_transmit_kwargs, _output=_output)
return stream_transmitter

if streamer == 'worker':
kwargs = asdict(config)
kwargs = config.all_non_default_attributes()
stream_worker = Worker(_input=_input, _output=_output, **kwargs)
return stream_worker

if streamer == 'process':
kwargs = asdict(config)
stream_processor = Processor(_input=_input, **kwargs)
stream_processor = Processor(config, _input=_input)
return stream_processor

if config.process_isolation:
Expand Down
46 changes: 19 additions & 27 deletions src/ansible_runner/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import BinaryIO

import ansible_runner
from ansible_runner.config.runner import RunnerConfig
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.loader import ArtifactLoader
import ansible_runner.plugins
Expand All @@ -38,16 +39,14 @@ def __init__(self, settings):


class Transmitter:
def __init__(self, only_transmit_kwargs: bool, _output: BinaryIO | None, **kwargs):
def __init__(self, config: RunnerConfig, only_transmit_kwargs: bool = False, _output: BinaryIO | None = None):
if _output is None:
_output = sys.stdout.buffer
self._output = _output
self.private_data_dir = os.path.abspath(kwargs['private_data_dir'])
self.private_data_dir = os.path.abspath(config.private_data_dir) if config.private_data_dir else ""
self.only_transmit_kwargs = only_transmit_kwargs
if 'keepalive_seconds' in kwargs:
kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg

self.kwargs = kwargs
self.kwargs = config.streamable_attributes()

self.status = "unstarted"
self.rc = None
Expand Down Expand Up @@ -251,43 +250,36 @@ def finished_callback(self, runner_obj):


class Processor:
def __init__(self, _input=None, status_handler=None, event_handler=None,
artifacts_handler=None, cancel_callback=None, finished_callback=None, **kwargs):
def __init__(self, config: RunnerConfig, _input: BinaryIO | None = None):
if _input is None:
_input = sys.stdin.buffer
self._input = _input

self.quiet = kwargs.get('quiet')
self.quiet = config.quiet

private_data_dir = kwargs.get('private_data_dir')
if private_data_dir is None:
private_data_dir = tempfile.mkdtemp()
self.private_data_dir = private_data_dir
self.private_data_dir: str = config.private_data_dir or ''
self._loader = ArtifactLoader(self.private_data_dir)

settings = kwargs.get('settings')
settings = config.settings
if settings is None:
try:
settings = self._loader.load_file('env/settings', Mapping)
settings = self._loader.load_file('env/settings', Mapping) # type: ignore
except ConfigurationError:
settings = {}
self.config = MockConfig(settings)

if kwargs.get('artifact_dir'):
self.artifact_dir = os.path.abspath(kwargs.get('artifact_dir'))
else:
project_artifacts = os.path.abspath(os.path.join(self.private_data_dir, 'artifacts'))
if ident := kwargs.get('ident'):
self.artifact_dir = os.path.join(project_artifacts, str(ident))
else:
self.artifact_dir = project_artifacts
self.artifact_dir = config.artifact_dir
if self.artifact_dir and not config.ident_set_by_user:
# If an ident value was not explicitly supplied, for some reason, we don't bother with
# using a subdir named with the ident value.
self.artifact_dir, _ = os.path.split(self.artifact_dir)

self.status_handler = status_handler
self.event_handler = event_handler
self.artifacts_handler = artifacts_handler
self.status_handler = config.status_handler
self.event_handler = config.event_handler
self.artifacts_handler = config.artifacts_handler

self.cancel_callback = cancel_callback # FIXME: unused
self.finished_callback = finished_callback
self.cancel_callback = config.cancel_callback # FIXME: unused
self.finished_callback = config.finished_callback

self.status = "unstarted"
self.rc = None
Expand Down
Loading

0 comments on commit 15fe5e5

Please sign in to comment.