Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composition over inheritance in probes #454

Merged
merged 15 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion goth/default-assets/goth-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ node-types:
class: "goth.runner.probe.RequestorProbe"

- name: "VM-Wasm-Provider"
class: "goth.runner.provider.ProviderProbeWithLogSteps"
class: "goth.runner.probe.ProviderProbe"
mount:
- read-only: "provider/presets.json"
destination: "/root/.local/share/ya-provider/presets.json"
Expand Down
10 changes: 4 additions & 6 deletions goth/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

from goth.configuration import Configuration
from goth.runner import Runner
from goth.runner.probe import RequestorProbe
from goth.runner.provider import ProviderProbeWithLogSteps
from goth.runner.probe import ProviderProbe, RequestorProbe


logger = logging.getLogger(__name__)
Expand All @@ -30,7 +29,7 @@ async def start_network(

async with runner(configuration.containers):

providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps)
providers = runner.get_probes(probe_type=ProviderProbe)
requestor = runner.get_probes(probe_type=RequestorProbe)[0]

# Some test steps may be included in the interactive test as well
Expand All @@ -41,9 +40,8 @@ async def start_network(
env = {"PATH": "$PATH"}
requestor.set_agent_env_vars(env)
env_vars = " ".join([f"{key}={val}" for key, val in env.items()])
print(
f"$ {env_vars} examples/blender/blender.py --subnet {providers[0].subnet}"
)
subnet = providers[0].provider_agent.subnet
print(f"$ {env_vars} examples/blender/blender.py --subnet {subnet}")

print("\nPress Ctrl+C at any moment to stop the test harness.\033[0m\n")

Expand Down
12 changes: 3 additions & 9 deletions goth/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import docker

from goth.runner.agent import AgentMixin
from goth.runner.container.compose import (
ComposeConfig,
ComposeNetworkManager,
Expand Down Expand Up @@ -127,12 +126,12 @@ def get_probes(
def check_assertion_errors(self) -> None:
"""If any monitor reports an assertion error, raise the first error."""

agent_probes = self.get_probes(probe_type=AgentMixin) # type: ignore
probe_agents = chain(*(probe.agents for probe in self.probes))

monitors = chain.from_iterable(
(
(probe.container.logs for probe in self.probes),
(probe.agent_logs for probe in agent_probes),
(agent.log_monitor for agent in probe_agents),
[self.proxy.monitor] if self.proxy else [],
)
)
Expand All @@ -149,9 +148,6 @@ def _create_probes(self, scenario_dir: Path) -> None:
docker_client = docker.from_env()

for config in self._topology:
logger.debug(
"Creating probe. config=%s, probe_type=%s", config, config.probe_type
)
log_config = config.log_config or LogConfig(config.name)
log_config.base_dir = scenario_dir

Expand Down Expand Up @@ -203,9 +199,7 @@ async def _start_nodes(self):
await self._exit_stack.enter_async_context(run_proxy(self.proxy))

# Collect all agent enabled probes and start them in parallel
awaitables = []
for probe in self.get_probes(probe_type=AgentMixin):
awaitables.append(probe.start_agent())
awaitables = [probe.start_agents() for probe in self.probes]
await asyncio.gather(*awaitables)

@property
Expand Down
58 changes: 0 additions & 58 deletions goth/runner/agent.py

This file was deleted.

4 changes: 4 additions & 0 deletions goth/runner/container/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ async def run_compose_network(
"""Implement AsyncContextManager for starting/stopping docker compose network."""

try:
logger.debug(
"Starting compose network. log_dir=%s, force_build=%s", log_dir, force_build
)
await compose_manager.start_network(log_dir, force_build)
yield
finally:
logger.debug("Stopping compose network")
await compose_manager.stop_network()
110 changes: 57 additions & 53 deletions goth/runner/probe.py → goth/runner/probe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Classes and helpers for managing Probes."""
"""Package related to goth's probe interface."""

import abc
import asyncio
from collections import OrderedDict
import contextlib
import copy
import logging
Expand All @@ -10,6 +11,7 @@
AsyncIterator,
Dict,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Expand All @@ -19,14 +21,12 @@

from goth.address import (
YAGNA_BUS_URL,
YAGNA_REST_PORT,
YAGNA_REST_URL,
)

from goth import gftp
from goth.node import DEFAULT_SUBNET
from goth.runner.agent import AgentMixin
from goth.runner.api_client import ApiClientMixin
from goth.runner import process
from goth.runner.cli import Cli, YagnaDockerCli
from goth.runner.container.utils import get_container_address
from goth.runner.container.yagna import (
Expand All @@ -37,7 +37,14 @@
from goth.runner.exceptions import KeyAlreadyExistsError
from goth.runner.log import LogConfig, monitored_logger
from goth.runner.log_monitor import PatternMatchingEventMonitor
from goth.runner import process
from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent
from goth.runner.probe.mixin import (
ActivityApiMixin,
MarketApiMixin,
PaymentApiMixin,
ProviderLogMixin,
)
from goth.runner.probe.rest_client import RestApiComponent


if TYPE_CHECKING:
Expand All @@ -63,6 +70,9 @@ class Probe(abc.ABC):
in subclasses (see `ProviderProbe` and `RequestorProbe`).
"""

api: RestApiComponent
"""Component with clients for all three yagna REST APIs."""

runner: "Runner"
"""A runner that created this probe."""

Expand All @@ -75,6 +85,12 @@ class Probe(abc.ABC):
ip_address: Optional[str] = None
"""An IP address of the daemon's container in the Docker network."""

_agents: "OrderedDict[str, AgentComponent]"
"""Collection of agent components that will be started as part of this probe.

Keys are agent names, values are subclasses of `AgentComponent`.
"""

_docker_client: DockerClient
"""A docker client used to create the deamon's container."""

Expand All @@ -96,6 +112,7 @@ def __init__(
log_config: LogConfig,
):
self.runner = runner
self._agents = OrderedDict()
self._docker_client = client
self._logger = ProbeLoggingAdapter(
logger, {ProbeLoggingAdapter.EXTRA_PROBE_NAME: config.name}
Expand All @@ -108,6 +125,11 @@ def __init__(
def __str__(self):
return self.name

@property
def agents(self) -> List[AgentComponent]:
"""List of agent components that will be started as part of this probe."""
return list(self._agents.values())

@property
def address(self) -> Optional[str]:
"""Return address from id marked as default."""
Expand Down Expand Up @@ -141,13 +163,24 @@ def _setup_gftp_proxy(self, config: YagnaContainerConfig) -> YagnaContainerConfi

return new_config

async def start(self) -> None:
"""Start the probe.
def add_agent(self, agent: AgentComponent) -> None:
"""Add an agent to be started for this probe."""
if self._agents.get(agent.name):
raise KeyAlreadyExistsError(
f"Probe already has agent component with name: `{agent.name}`"
)
self._agents[agent.name] = agent

This method is extended in subclasses and mixins.
"""
async def start(self) -> None:
"""Start the probe."""

await self._start_container()
self.api = RestApiComponent(self)

async def start_agents(self):
"""Start all of the probe's agents."""
for agent in self.agents:
await agent.start()

async def stop(self):
"""
Expand All @@ -156,6 +189,8 @@ async def stop(self):
Once stopped, a probe cannot be restarted.
"""
self._logger.info("Stopping probe")
for agent in self.agents:
await agent.stop()
if self.container.logs:
await self.container.logs.stop()

Expand Down Expand Up @@ -312,12 +347,16 @@ def create_probe(

probe: Optional[Probe] = None
try:
logger.debug(
"Creating probe. config=%s, probe_type=%s", config, config.probe_type
)
probe = config.probe_type(runner, docker_client, config, log_config)
for name, value in config.probe_properties.items():
probe.__setattr__(name, value)
yield probe
finally:
if probe:
logger.debug("Removing probe. name=%s", probe.name)
probe.remove()


Expand All @@ -326,34 +365,16 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]:
"""Implement AsyncContextManager for starting and stopping a probe."""

try:
logger.debug("Starting probe. name=%s", probe.name)
await probe.start()
assert probe.ip_address
yield probe.ip_address
finally:
await probe.stop()


class RequestorProbe(ApiClientMixin, Probe):
"""A requestor probe that can make calls to Yagna REST APIs.

This class is used in Level 1 scenarios and as a type of `self`
argument for `Market/Payment/ActivityOperationsMixin` methods.
"""

_api_base_host: str
"""Base hostname for the Yagna API clients."""

def __init__(
self,
runner: "Runner",
client: DockerClient,
config: YagnaContainerConfig,
log_config: LogConfig,
):
super().__init__(runner, client, config, log_config)
host_port = self.container.ports[YAGNA_REST_PORT]
proxy_ip = "127.0.0.1" # use the host-mapped proxy port
self._api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port)
class RequestorProbe(ActivityApiMixin, MarketApiMixin, PaymentApiMixin, Probe):
"""A probe subclass with activity API steps and requestor payment init."""

async def _start_container(self) -> None:
await super()._start_container()
Expand All @@ -362,14 +383,12 @@ async def _start_container(self) -> None:
self.cli.payment_init(sender_mode=True)


class ProviderProbe(AgentMixin, Probe):
class ProviderProbe(MarketApiMixin, PaymentApiMixin, ProviderLogMixin, Probe):
"""A probe subclass that can run a provider agent."""

agent_preset: Optional[str]
"""Name of the preset to be used when placing a market offer."""

subnet: str
"""Name of the subnet to which the provider agent connects."""
provider_agent: ProviderAgentComponent
"""The agent component running `ya-provider` for this probe.
This field is added for convenience to make getting this agent instance easier."""

def __init__(
self,
Expand All @@ -381,20 +400,5 @@ def __init__(
subnet: str = DEFAULT_SUBNET,
):
super().__init__(runner, client, config, log_config)
self.agent_preset = agent_preset
self.subnet = subnet

async def start_agent(self):
"""Start the provider agent and attach to its log stream."""

self._logger.info("Starting ya-provider")

if self.agent_preset:
self.container.exec_run(f"ya-provider preset activate {self.agent_preset}")
self.container.exec_run(f"ya-provider config set --subnet {self.subnet}")

log_stream = self.container.exec_run(
f"ya-provider run" f" --app-key {self.app_key} --node-name {self.name}",
stream=True,
)
self.agent_logs.start(log_stream.output)
self.provider_agent = ProviderAgentComponent(self, subnet, agent_preset)
self.add_agent(self.provider_agent)
Loading