Skip to content

Commit

Permalink
Merge pull request #454 from golemfactory/km/probe-modules
Browse files Browse the repository at this point in the history
Composition over inheritance in probes
  • Loading branch information
kmazurek authored Mar 31, 2021
2 parents 15558c6 + a9d0d89 commit 2edc941
Show file tree
Hide file tree
Showing 22 changed files with 596 additions and 501 deletions.
2 changes: 1 addition & 1 deletion goth/default-assets/goth-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ node-types:
class: "goth.runner.probe.RequestorProbe"

- name: "VM-Wasm-Provider"
class: "goth.runner.provider.ProviderProbeWithLogSteps"
class: "goth.runner.probe.ProviderProbe"
mount:
- read-only: "provider/presets.json"
destination: "/root/.local/share/ya-provider/presets.json"
Expand Down
10 changes: 4 additions & 6 deletions goth/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

from goth.configuration import Configuration
from goth.runner import Runner
from goth.runner.probe import RequestorProbe
from goth.runner.provider import ProviderProbeWithLogSteps
from goth.runner.probe import ProviderProbe, RequestorProbe


logger = logging.getLogger(__name__)
Expand All @@ -30,7 +29,7 @@ async def start_network(

async with runner(configuration.containers):

providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps)
providers = runner.get_probes(probe_type=ProviderProbe)
requestor = runner.get_probes(probe_type=RequestorProbe)[0]

# Some test steps may be included in the interactive test as well
Expand All @@ -41,9 +40,8 @@ async def start_network(
env = {"PATH": "$PATH"}
requestor.set_agent_env_vars(env)
env_vars = " ".join([f"{key}={val}" for key, val in env.items()])
print(
f"$ {env_vars} examples/blender/blender.py --subnet {providers[0].subnet}"
)
subnet = providers[0].provider_agent.subnet
print(f"$ {env_vars} examples/blender/blender.py --subnet {subnet}")

print("\nPress Ctrl+C at any moment to stop the test harness.\033[0m\n")

Expand Down
12 changes: 3 additions & 9 deletions goth/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import docker

from goth.runner.agent import AgentMixin
from goth.runner.container.compose import (
ComposeConfig,
ComposeNetworkManager,
Expand Down Expand Up @@ -127,12 +126,12 @@ def get_probes(
def check_assertion_errors(self) -> None:
"""If any monitor reports an assertion error, raise the first error."""

agent_probes = self.get_probes(probe_type=AgentMixin) # type: ignore
probe_agents = chain(*(probe.agents for probe in self.probes))

monitors = chain.from_iterable(
(
(probe.container.logs for probe in self.probes),
(probe.agent_logs for probe in agent_probes),
(agent.log_monitor for agent in probe_agents),
[self.proxy.monitor] if self.proxy else [],
)
)
Expand All @@ -149,9 +148,6 @@ def _create_probes(self, scenario_dir: Path) -> None:
docker_client = docker.from_env()

for config in self._topology:
logger.debug(
"Creating probe. config=%s, probe_type=%s", config, config.probe_type
)
log_config = config.log_config or LogConfig(config.name)
log_config.base_dir = scenario_dir

Expand Down Expand Up @@ -203,9 +199,7 @@ async def _start_nodes(self):
await self._exit_stack.enter_async_context(run_proxy(self.proxy))

# Collect all agent enabled probes and start them in parallel
awaitables = []
for probe in self.get_probes(probe_type=AgentMixin):
awaitables.append(probe.start_agent())
awaitables = [probe.start_agents() for probe in self.probes]
await asyncio.gather(*awaitables)

@property
Expand Down
58 changes: 0 additions & 58 deletions goth/runner/agent.py

This file was deleted.

4 changes: 4 additions & 0 deletions goth/runner/container/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ async def run_compose_network(
"""Implement AsyncContextManager for starting/stopping docker compose network."""

try:
logger.debug(
"Starting compose network. log_dir=%s, force_build=%s", log_dir, force_build
)
await compose_manager.start_network(log_dir, force_build)
yield
finally:
logger.debug("Stopping compose network")
await compose_manager.stop_network()
110 changes: 57 additions & 53 deletions goth/runner/probe.py → goth/runner/probe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Classes and helpers for managing Probes."""
"""Package related to goth's probe interface."""

import abc
import asyncio
from collections import OrderedDict
import contextlib
import copy
import logging
Expand All @@ -10,6 +11,7 @@
AsyncIterator,
Dict,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Expand All @@ -19,14 +21,12 @@

from goth.address import (
YAGNA_BUS_URL,
YAGNA_REST_PORT,
YAGNA_REST_URL,
)

from goth import gftp
from goth.node import DEFAULT_SUBNET
from goth.runner.agent import AgentMixin
from goth.runner.api_client import ApiClientMixin
from goth.runner import process
from goth.runner.cli import Cli, YagnaDockerCli
from goth.runner.container.utils import get_container_address
from goth.runner.container.yagna import (
Expand All @@ -37,7 +37,14 @@
from goth.runner.exceptions import KeyAlreadyExistsError
from goth.runner.log import LogConfig, monitored_logger
from goth.runner.log_monitor import PatternMatchingEventMonitor
from goth.runner import process
from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent
from goth.runner.probe.mixin import (
ActivityApiMixin,
MarketApiMixin,
PaymentApiMixin,
ProviderLogMixin,
)
from goth.runner.probe.rest_client import RestApiComponent


if TYPE_CHECKING:
Expand All @@ -63,6 +70,9 @@ class Probe(abc.ABC):
in subclasses (see `ProviderProbe` and `RequestorProbe`).
"""

api: RestApiComponent
"""Component with clients for all three yagna REST APIs."""

runner: "Runner"
"""A runner that created this probe."""

Expand All @@ -75,6 +85,12 @@ class Probe(abc.ABC):
ip_address: Optional[str] = None
"""An IP address of the daemon's container in the Docker network."""

_agents: "OrderedDict[str, AgentComponent]"
"""Collection of agent components that will be started as part of this probe.
Keys are agent names, values are subclasses of `AgentComponent`.
"""

_docker_client: DockerClient
"""A docker client used to create the deamon's container."""

Expand All @@ -96,6 +112,7 @@ def __init__(
log_config: LogConfig,
):
self.runner = runner
self._agents = OrderedDict()
self._docker_client = client
self._logger = ProbeLoggingAdapter(
logger, {ProbeLoggingAdapter.EXTRA_PROBE_NAME: config.name}
Expand All @@ -108,6 +125,11 @@ def __init__(
def __str__(self):
return self.name

@property
def agents(self) -> List[AgentComponent]:
"""List of agent components that will be started as part of this probe."""
return list(self._agents.values())

@property
def address(self) -> Optional[str]:
"""Return address from id marked as default."""
Expand Down Expand Up @@ -141,13 +163,24 @@ def _setup_gftp_proxy(self, config: YagnaContainerConfig) -> YagnaContainerConfi

return new_config

async def start(self) -> None:
"""Start the probe.
def add_agent(self, agent: AgentComponent) -> None:
"""Add an agent to be started for this probe."""
if self._agents.get(agent.name):
raise KeyAlreadyExistsError(
f"Probe already has agent component with name: `{agent.name}`"
)
self._agents[agent.name] = agent

This method is extended in subclasses and mixins.
"""
async def start(self) -> None:
"""Start the probe."""

await self._start_container()
self.api = RestApiComponent(self)

async def start_agents(self):
"""Start all of the probe's agents."""
for agent in self.agents:
await agent.start()

async def stop(self):
"""
Expand All @@ -156,6 +189,8 @@ async def stop(self):
Once stopped, a probe cannot be restarted.
"""
self._logger.info("Stopping probe")
for agent in self.agents:
await agent.stop()
if self.container.logs:
await self.container.logs.stop()

Expand Down Expand Up @@ -312,12 +347,16 @@ def create_probe(

probe: Optional[Probe] = None
try:
logger.debug(
"Creating probe. config=%s, probe_type=%s", config, config.probe_type
)
probe = config.probe_type(runner, docker_client, config, log_config)
for name, value in config.probe_properties.items():
probe.__setattr__(name, value)
yield probe
finally:
if probe:
logger.debug("Removing probe. name=%s", probe.name)
probe.remove()


Expand All @@ -326,34 +365,16 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]:
"""Implement AsyncContextManager for starting and stopping a probe."""

try:
logger.debug("Starting probe. name=%s", probe.name)
await probe.start()
assert probe.ip_address
yield probe.ip_address
finally:
await probe.stop()


class RequestorProbe(ApiClientMixin, Probe):
"""A requestor probe that can make calls to Yagna REST APIs.
This class is used in Level 1 scenarios and as a type of `self`
argument for `Market/Payment/ActivityOperationsMixin` methods.
"""

_api_base_host: str
"""Base hostname for the Yagna API clients."""

def __init__(
self,
runner: "Runner",
client: DockerClient,
config: YagnaContainerConfig,
log_config: LogConfig,
):
super().__init__(runner, client, config, log_config)
host_port = self.container.ports[YAGNA_REST_PORT]
proxy_ip = "127.0.0.1" # use the host-mapped proxy port
self._api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port)
class RequestorProbe(ActivityApiMixin, MarketApiMixin, PaymentApiMixin, Probe):
"""A probe subclass with activity API steps and requestor payment init."""

async def _start_container(self) -> None:
await super()._start_container()
Expand All @@ -362,14 +383,12 @@ async def _start_container(self) -> None:
self.cli.payment_init(sender_mode=True)


class ProviderProbe(AgentMixin, Probe):
class ProviderProbe(MarketApiMixin, PaymentApiMixin, ProviderLogMixin, Probe):
"""A probe subclass that can run a provider agent."""

agent_preset: Optional[str]
"""Name of the preset to be used when placing a market offer."""

subnet: str
"""Name of the subnet to which the provider agent connects."""
provider_agent: ProviderAgentComponent
"""The agent component running `ya-provider` for this probe.
This field is added for convenience to make getting this agent instance easier."""

def __init__(
self,
Expand All @@ -381,20 +400,5 @@ def __init__(
subnet: str = DEFAULT_SUBNET,
):
super().__init__(runner, client, config, log_config)
self.agent_preset = agent_preset
self.subnet = subnet

async def start_agent(self):
"""Start the provider agent and attach to its log stream."""

self._logger.info("Starting ya-provider")

if self.agent_preset:
self.container.exec_run(f"ya-provider preset activate {self.agent_preset}")
self.container.exec_run(f"ya-provider config set --subnet {self.subnet}")

log_stream = self.container.exec_run(
f"ya-provider run" f" --app-key {self.app_key} --node-name {self.name}",
stream=True,
)
self.agent_logs.start(log_stream.output)
self.provider_agent = ProviderAgentComponent(self, subnet, agent_preset)
self.add_agent(self.provider_agent)
Loading

0 comments on commit 2edc941

Please sign in to comment.