From c1e5340976bab54764d8624d9a671e41db6c2738 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Thu, 18 Mar 2021 18:16:25 +0100 Subject: [PATCH 01/13] Move REST API functionality to base probe class --- goth/runner/{probe.py => probe/__init__.py} | 32 +- .../{api_client.py => probe/rest_client.py} | 23 +- goth/runner/probe/steps.py | 308 ++++++++++++++++++ goth/runner/step.py | 8 +- 4 files changed, 337 insertions(+), 34 deletions(-) rename goth/runner/{probe.py => probe/__init__.py} (95%) rename goth/runner/{api_client.py => probe/rest_client.py} (85%) create mode 100644 goth/runner/probe/steps.py diff --git a/goth/runner/probe.py b/goth/runner/probe/__init__.py similarity index 95% rename from goth/runner/probe.py rename to goth/runner/probe/__init__.py index 04c273834..9d061efb7 100644 --- a/goth/runner/probe.py +++ b/goth/runner/probe/__init__.py @@ -1,4 +1,4 @@ -"""Classes and helpers for managing Probes.""" +"""Package related to goth's probe interface.""" import abc import asyncio @@ -19,7 +19,6 @@ from goth import gftp from goth.node import DEFAULT_SUBNET from goth.runner.agent import AgentMixin -from goth.runner.api_client import ApiClientMixin from goth.runner.cli import Cli, YagnaDockerCli from goth.runner.container.utils import get_container_address from goth.runner.container.yagna import ( @@ -29,6 +28,8 @@ ) from goth.runner.exceptions import KeyAlreadyExistsError from goth.runner.log import LogConfig +from goth.runner.probe.steps import RequestorApiMixin +from goth.runner.probe.rest_client import RestApiComponent from goth.runner.process import run_command if TYPE_CHECKING: @@ -54,6 +55,9 @@ class Probe(abc.ABC): in subclasses (see `ProviderProbe` and `RequestorProbe`). """ + api: RestApiComponent + """Component with clients for all three yagna REST APIs.""" + runner: "Runner" """A runner that created this probe.""" @@ -140,6 +144,13 @@ async def start(self) -> None: await self._start_container() + host_port = self.container.ports[YAGNA_REST_PORT] + proxy_ip = "127.0.0.1" # use the host-mapped proxy port + api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port) + if not self.app_key: + raise RuntimeError("No app key found. container=%s", self.name) + self.api = RestApiComponent(api_base_host, self.app_key) + async def stop(self): """ Stop the probe, removing the Docker container of the daemon being tested. @@ -295,28 +306,13 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]: await probe.stop() -class RequestorProbe(ApiClientMixin, Probe): +class RequestorProbe(RequestorApiMixin, Probe): """A requestor probe that can make calls to Yagna REST APIs. This class is used in Level 1 scenarios and as a type of `self` argument for `Market/Payment/ActivityOperationsMixin` methods. """ - _api_base_host: str - """Base hostname for the Yagna API clients.""" - - def __init__( - self, - runner: "Runner", - client: DockerClient, - config: YagnaContainerConfig, - log_config: LogConfig, - ): - super().__init__(runner, client, config, log_config) - host_port = self.container.ports[YAGNA_REST_PORT] - proxy_ip = "127.0.0.1" # use the host-mapped proxy port - self._api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port) - async def _start_container(self) -> None: await super()._start_container() diff --git a/goth/runner/api_client.py b/goth/runner/probe/rest_client.py similarity index 85% rename from goth/runner/api_client.py rename to goth/runner/probe/rest_client.py index 13800f73a..a46629b5c 100644 --- a/goth/runner/api_client.py +++ b/goth/runner/probe/rest_client.py @@ -48,7 +48,7 @@ class ConfigurationProtocol(Protocol): class ApiModule(Protocol[ConfTVar, ClientTVar]): """Representation of a REST API module. - Used for typing `ApiClientMixin._create_api_client`. + Used for typing `YagnaApiModule._create_api_client`. """ def Configuration(self, host: str) -> ConfTVar: @@ -60,8 +60,8 @@ def ApiClient(self, conf: ConfTVar) -> ClientTVar: pass -class ApiClientMixin: - """Provides client objects for Yagna REST APIs.""" +class RestApiComponent: + """Component with clients for all three yagna REST APIs.""" activity: ActivityApiClient """Activity API client for the requestor daemon.""" @@ -72,23 +72,20 @@ class ApiClientMixin: payment: ya_payment.RequestorApi """Payment API client for the requestor daemon.""" - _api_base_host: str - """Base hostname for the Yagna API clients.""" + _app_key: str - async def start(self): - """Start the probe and initialize the API clients.""" - - await super().start() - self._init_activity_api(self._api_base_host) - self._init_payment_api(self._api_base_host) - self._init_market_api(self._api_base_host) + def __init__(self, base_hostname: str, app_key: str): + self._app_key = app_key + self._init_activity_api(base_hostname) + self._init_payment_api(base_hostname) + self._init_market_api(base_hostname) def _create_api_client( self, api_module: ApiModule[ConfTVar, ClientTVar], api_url: str ) -> ClientTVar: api_url = ensure_no_trailing_slash(str(api_url)) config: ConfTVar = api_module.Configuration(api_url) - config.access_token = self.app_key + config.access_token = self._app_key return api_module.ApiClient(config) def _init_activity_api(self, api_base_host: str) -> None: diff --git a/goth/runner/probe/steps.py b/goth/runner/probe/steps.py new file mode 100644 index 000000000..c2b109a64 --- /dev/null +++ b/goth/runner/probe/steps.py @@ -0,0 +1,308 @@ +"""Probe mixins containing high-level steps.""" + +import asyncio +from datetime import datetime, timedelta +import logging +from typing import ( + Callable, + Iterable, + List, + Optional, + Protocol, + Sequence, + Tuple, + TYPE_CHECKING, +) + +from ya_activity import ExeScriptCommandResult, ExeScriptRequest +from ya_market import AgreementProposal, Demand, DemandOfferBase, Proposal +from ya_payment import Acceptance, Allocation, Invoice + +from goth.node import DEFAULT_SUBNET +from goth.runner.step import step + +if TYPE_CHECKING: + from goth.runner.probe import Probe + from goth.runner.probe.rest_client import RestApiComponent + +logger = logging.getLogger(__name__) + + +class ProbeProtocol(Protocol): + """Protocol class representing the probe interface in mixins. + + This is mainly to fix mypy errors when using `Probe` directly as `self` type. + """ + + api: "RestApiComponent" + """REST API probe component.""" + + +class RequestorApiMixin: + """Provides high-level steps that rely on Yagna Activity API.""" + + @step() + async def create_activity(self: ProbeProtocol, agreement_id: str) -> str: + """Call create_activity on the requestor activity api.""" + + activity_id = await self.api.activity.control.create_activity(agreement_id) + return activity_id + + @step() + async def call_exec(self: ProbeProtocol, activity_id: str, exe_script: str) -> str: + """Call call_exec on the requestor activity api.""" + + script_request = ExeScriptRequest(exe_script) + batch_id = await self.api.activity.control.call_exec( + activity_id, script_request + ) + return batch_id + + @step() + async def collect_results( + self: ProbeProtocol, activity_id: str, batch_id: str, num_results: int + ) -> List[ExeScriptCommandResult]: + """Call collect_results on the requestor activity api.""" + + results: List[ExeScriptCommandResult] = [] + + while len(results) < num_results: + results = await self.api.activity.control.get_exec_batch_results( + activity_id, batch_id + ) + await asyncio.sleep(1.0) + return results + + @step() + async def destroy_activity(self: ProbeProtocol, activity_id: str) -> None: + """Call destroy_activity on the requestor activity api.""" + + await self.api.activity.control.destroy_activity(activity_id) + + @step() + async def subscribe_demand( + self: ProbeProtocol, demand: Demand + ) -> Tuple[str, Demand]: + """Call subscribe demand on the requestor market api.""" + subscription_id = await self.api.market.subscribe_demand(demand) + return subscription_id, demand + + @step() + async def subscribe_template_demand( + self: ProbeProtocol, task_package: str, constraints: str + ) -> Tuple[str, Demand]: + """Build Demand from template and call subscribe demand on market api.""" + + demand = DemandOfferBase( + properties={ + "golem.node.id.name": "test1", + "golem.srv.comp.expiration": int( + (datetime.now() + timedelta(minutes=10)).timestamp() * 1000 + ), + "golem.srv.comp.task_package": task_package, + "golem.node.debug.subnet": DEFAULT_SUBNET, + }, + constraints=constraints, + ) + + return await self.subscribe_demand(demand) # type: ignore + + @step() + async def unsubscribe_demand(self: ProbeProtocol, subscription_id: str) -> None: + """Call unsubscribe demand on the requestor market api.""" + await self.api.market.unsubscribe_demand(subscription_id) + + @step() + async def wait_for_proposals( + self: ProbeProtocol, + subscription_id: str, + providers: Sequence["Probe"], + filter: Optional[Callable[[Proposal], bool]] = lambda p: True, + ) -> List[Proposal]: + """Call collect_offers on the requestor market api. + + Polls collect_offers continously until an offer from each of the given + providers is received. Returns a list of the collected proposals. + """ + proposals: List[Proposal] = [] + provider_ids = {p.address for p in providers} + + while len(proposals) < len(provider_ids): + collected_offers = await self.api.market.collect_offers(subscription_id) + if collected_offers: + logger.debug( + "collect_offers(%s). collected_offers=%r", + subscription_id, + collected_offers, + ) + collected_proposals = [ + offer.proposal + for offer in collected_offers + if ( + offer.proposal.issuer_id in provider_ids + and filter(offer.proposal) + ) + ] + proposals.extend(collected_proposals) + else: + logger.debug( + "Waiting for proposals. subscription_id=%s", subscription_id + ) + + return proposals + + @step() + async def counter_proposal( + self: ProbeProtocol, + subscription_id: str, + demand: Demand, + provider_proposal: Proposal, + ) -> str: + """Call counter_proposal_demand on the requestor market api.""" + + proposal = DemandOfferBase( + constraints=demand.constraints, + properties=demand.properties, + ) + + counter_proposal = await self.api.market.counter_proposal_demand( + subscription_id=subscription_id, + proposal_id=provider_proposal.proposal_id, + demand_offer_base=proposal, + ) + + return counter_proposal + + @step() + async def create_agreement(self: ProbeProtocol, proposal: Proposal) -> str: + """Call create_agreement on the requestor market api.""" + + valid_to = str(datetime.utcnow() + timedelta(days=1)) + "Z" + logger.debug( + "Creating agreement, proposal_id=%s, valid_to=%s", + proposal.proposal_id, + valid_to, + ) + agreement_proposal = AgreementProposal( + proposal_id=proposal.proposal_id, valid_to=valid_to + ) + + agreement_id = await self.api.market.create_agreement(agreement_proposal) + return agreement_id + + @step() + async def confirm_agreement(self: ProbeProtocol, agreement_id: str) -> None: + """Call confirm_agreement on the requestor market api.""" + await self.api.market.confirm_agreement(agreement_id) + + @step() + async def wait_for_approval(self: ProbeProtocol, agreement_id: str) -> None: + """Call wait_for_approval on the requestor market api.""" + await self.api.market.wait_for_approval(agreement_id) + + @step() + async def terminate_agreement( + self: ProbeProtocol, agreement_id: str, reason: Optional[str] + ): + """Call terminate_agreement on the requestor market api.""" + await self.api.market.terminate_agreement( + agreement_id, request_body={"message": "Terminated by requestor"} + ) + + @step() + async def gather_invoices(self: ProbeProtocol, agreement_id: str) -> List[Invoice]: + """Call gather_invoice on the requestor payment api.""" + + invoices: List[Invoice] = [] + + while not invoices: + await asyncio.sleep(2.0) + invoices = await self.api.payment.get_invoices() + invoices = [inv for inv in invoices if inv.agreement_id == agreement_id] + + return invoices + + @step() + async def pay_invoices( + self: ProbeProtocol, invoice_events: Iterable[Invoice] + ) -> None: + """Call accept_invoice on the requestor payment api.""" + + for invoice_event in invoice_events: + allocation = Allocation( + allocation_id="", + total_amount=invoice_event.amount, + spent_amount=0, + remaining_amount=0, + make_deposit=True, + ) + allocation_result = await self.api.payment.create_allocation(allocation) + logger.debug("Created allocation. id=%s", allocation_result) + + acceptance = Acceptance( + total_amount_accepted=invoice_event.amount, + allocation_id=allocation_result.allocation_id, + ) + await self.api.payment.accept_invoice(invoice_event.invoice_id, acceptance) + logger.debug("Accepted invoice. id=%s", invoice_event.invoice_id) + + +class ProviderLogMixin: + """A provider probe with steps that wait for specific messages in agent logs.""" + + @step() + async def wait_for_offer_subscribed(self): + """Wait until the provider agent subscribes to the offer.""" + await self._wait_for_agent_log("Subscribed offer") + + @step() + async def wait_for_proposal_accepted(self): + """Wait until the provider agent subscribes to the offer.""" + await self._wait_for_agent_log("Decided to CounterProposal") + + @step() + async def wait_for_agreement_approved(self): + """Wait until the provider agent subscribes to the offer.""" + await self._wait_for_agent_log("Decided to ApproveAgreement") + + @step() + async def wait_for_exeunit_started(self): + """Wait until the provider agent starts the exe-unit.""" + await self._wait_for_agent_log(r"(.*)\[ExeUnit\](.+)Supervisor initialized$") + + @step() + async def wait_for_exeunit_finished(self): + """Wait until exe-unit finishes.""" + await self._wait_for_agent_log( + "ExeUnit process exited with status Finished - exit code: 0" + ) + + @step() + async def wait_for_agreement_terminated(self): + """Wait until Agreement will be terminated. + + This can happen for 2 reasons (both caught by this function): + - Requestor terminates - most common case + - Provider terminates - it happens for compatibility with previous + versions of API without `terminate` endpoint implemented. Moreover + Provider can terminate, because Agreements condition where broken. + """ + await self._wait_for_agent_log(r"Agreement \[.*\] terminated by") + + @step() + async def wait_for_agreement_cleanup(self): + """Wait until Provider will cleanup all allocated resources. + + This can happen before or after Agreement terminated log will be printed. + """ + await self._wait_for_agent_log(r"Agreement \[.*\] cleanup finished.") + + @step() + async def wait_for_invoice_sent(self): + """Wait until the invoice is sent.""" + await self._wait_for_agent_log("Invoice (.+) sent") + + @step(default_timeout=300) + async def wait_for_invoice_paid(self): + """Wait until the invoice is paid.""" + await self._wait_for_agent_log("Invoice .+? for agreement .+? was paid") diff --git a/goth/runner/step.py b/goth/runner/step.py index 304715e04..513adcf61 100644 --- a/goth/runner/step.py +++ b/goth/runner/step.py @@ -3,10 +3,12 @@ import functools import logging import time -from typing import Optional +from typing import Optional, TYPE_CHECKING from goth.runner.exceptions import StepTimeoutError -from goth.runner.probe import Probe + +if TYPE_CHECKING: + from goth.runner.probe import Probe logger = logging.getLogger(__name__) @@ -17,7 +19,7 @@ def step(default_timeout: float = 10.0): def decorator(func): @functools.wraps(func) - async def wrapper(self: Probe, *args, timeout: Optional[float] = None): + async def wrapper(self: "Probe", *args, timeout: Optional[float] = None): timeout = timeout if timeout is not None else default_timeout step_name = f"{self.name}.{func.__name__}(timeout={timeout})" start_time = time.time() From 8156353d3d4456369e1c30b7a49f013d749e0589 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Wed, 24 Mar 2021 13:59:12 +0100 Subject: [PATCH 02/13] Introduce agent components to Probe --- goth/runner/__init__.py | 11 ++-- goth/runner/agent.py | 58 -------------------- goth/runner/probe/__init__.py | 40 ++++---------- goth/runner/probe/agent.py | 92 ++++++++++++++++++++++++++++++++ goth/runner/probe/component.py | 21 ++++++++ goth/runner/probe/rest_client.py | 26 +++++---- goth/runner/provider.py | 18 +++---- 7 files changed, 154 insertions(+), 112 deletions(-) delete mode 100644 goth/runner/agent.py create mode 100644 goth/runner/probe/agent.py create mode 100644 goth/runner/probe/component.py diff --git a/goth/runner/__init__.py b/goth/runner/__init__.py index a82eada05..a150a1247 100644 --- a/goth/runner/__init__.py +++ b/goth/runner/__init__.py @@ -20,7 +20,6 @@ import docker -from goth.runner.agent import AgentMixin from goth.runner.container.compose import ( ComposeConfig, ComposeNetworkManager, @@ -127,12 +126,14 @@ def get_probes( def check_assertion_errors(self) -> None: """If any monitor reports an assertion error, raise the first error.""" - agent_probes = self.get_probes(probe_type=AgentMixin) # type: ignore + probe_agents = [] + for probe in self.probes: + probe_agents.extend(chain(probe.agents)) monitors = chain.from_iterable( ( (probe.container.logs for probe in self.probes), - (probe.agent_logs for probe in agent_probes), + (agent.log_monitor for agent in probe_agents), [self.proxy.monitor] if self.proxy else [], ) ) @@ -204,8 +205,8 @@ async def _start_nodes(self): # Collect all agent enabled probes and start them in parallel awaitables = [] - for probe in self.get_probes(probe_type=AgentMixin): - awaitables.append(probe.start_agent()) + for probe in self.probes: + awaitables.extend([a.start() for a in probe.agents]) await asyncio.gather(*awaitables) @property diff --git a/goth/runner/agent.py b/goth/runner/agent.py deleted file mode 100644 index 4455d8f22..000000000 --- a/goth/runner/agent.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Module for adding yagna agent functionality to `Probe` subclasses.""" -import abc -import logging - -from goth.runner.log import LogConfig -from goth.runner.log_monitor import LogEvent, LogEventMonitor - -logger = logging.getLogger(__name__) - - -class AgentMixin(abc.ABC): - """Mixin class which extends a `Probe` to allow for running an agent. - - The agent can be an arbitrary binary executed within the container. The abstract - method `start_agent` is where this binary should be started. - This mixin also includes logic for saving and monitoring the agent logs. - """ - - agent_logs: LogEventMonitor - """Monitor and buffer for agent logs, enables asserting for certain lines to be - present in the log buffer. - """ - - @abc.abstractmethod - def start_agent(self): - """Start the agent binary. - - To enable the log monitor, this method must call `start` on `agent_logs`, - passing in the log stream from the agent binary. - """ - - async def start(self): - """Start the probe and initialize the log monitor.""" - self.agent_logs = None - await super().start() - self._init_log_monitor() - - async def stop(self): - """Stop the probe and the log monitor.""" - await super().stop() - if self.agent_logs: - await self.agent_logs.stop() - - def _init_log_monitor(self): - name = f"{self.name}_agent" - log_config = LogConfig(file_name=name) - if self.container.log_config: - log_config.base_dir = self.container.log_config.base_dir - - self.agent_logs = LogEventMonitor(name, log_config) - self._last_checked_line = -1 - - async def _wait_for_agent_log( - self, pattern: str, timeout: float = 1000 - ) -> LogEvent: - """Search agent logs for a log line with the message matching `pattern`.""" - entry = await self.agent_logs.wait_for_entry(pattern, timeout) - return entry diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 9d061efb7..75a79dd63 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -6,7 +6,7 @@ import copy import logging from pathlib import Path -from typing import AsyncIterator, Dict, Iterator, Optional, TYPE_CHECKING +from typing import AsyncIterator, Dict, Iterator, List, Optional, TYPE_CHECKING from docker import DockerClient @@ -18,7 +18,6 @@ from goth import gftp from goth.node import DEFAULT_SUBNET -from goth.runner.agent import AgentMixin from goth.runner.cli import Cli, YagnaDockerCli from goth.runner.container.utils import get_container_address from goth.runner.container.yagna import ( @@ -28,6 +27,7 @@ ) from goth.runner.exceptions import KeyAlreadyExistsError from goth.runner.log import LogConfig +from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent from goth.runner.probe.steps import RequestorApiMixin from goth.runner.probe.rest_client import RestApiComponent from goth.runner.process import run_command @@ -55,6 +55,9 @@ class Probe(abc.ABC): in subclasses (see `ProviderProbe` and `RequestorProbe`). """ + agents: List[AgentComponent] + """List of agent components that will be started as part of this probe.""" + api: RestApiComponent """Component with clients for all three yagna REST APIs.""" @@ -90,6 +93,7 @@ def __init__( config: YagnaContainerConfig, log_config: LogConfig, ): + self.agents = [] self.runner = runner self._docker_client = client self._logger = ProbeLoggingAdapter( @@ -147,9 +151,7 @@ async def start(self) -> None: host_port = self.container.ports[YAGNA_REST_PORT] proxy_ip = "127.0.0.1" # use the host-mapped proxy port api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port) - if not self.app_key: - raise RuntimeError("No app key found. container=%s", self.name) - self.api = RestApiComponent(api_base_host, self.app_key) + self.api = RestApiComponent(self, api_base_host) async def stop(self): """ @@ -158,6 +160,8 @@ async def stop(self): Once stopped, a probe cannot be restarted. """ self._logger.info("Stopping probe") + for agent in self.agents: + await agent.stop() if self.container.logs: await self.container.logs.stop() @@ -320,15 +324,9 @@ async def _start_container(self) -> None: self.cli.payment_init(sender_mode=True) -class ProviderProbe(AgentMixin, Probe): +class ProviderProbe(Probe): """A probe subclass that can run a provider agent.""" - agent_preset: Optional[str] - """Name of the preset to be used when placing a market offer.""" - - subnet: str - """Name of the subnet to which the provider agent connects.""" - def __init__( self, runner: "Runner", @@ -339,20 +337,4 @@ def __init__( subnet: str = DEFAULT_SUBNET, ): super().__init__(runner, client, config, log_config) - self.agent_preset = agent_preset - self.subnet = subnet - - async def start_agent(self): - """Start the provider agent and attach to its log stream.""" - - self._logger.info("Starting ya-provider") - - if self.agent_preset: - self.container.exec_run(f"ya-provider preset activate {self.agent_preset}") - self.container.exec_run(f"ya-provider config set --subnet {self.subnet}") - - log_stream = self.container.exec_run( - f"ya-provider run" f" --app-key {self.app_key} --node-name {self.name}", - stream=True, - ) - self.agent_logs.start(log_stream.output) + self.agents.append(ProviderAgentComponent(self, subnet, agent_preset)) diff --git a/goth/runner/probe/agent.py b/goth/runner/probe/agent.py new file mode 100644 index 000000000..66e104415 --- /dev/null +++ b/goth/runner/probe/agent.py @@ -0,0 +1,92 @@ +"""Module for agent components to be used with `Probe` objects.""" +import abc +import logging +from typing import Optional, TYPE_CHECKING + +from goth.runner.log import LogConfig +from goth.runner.log_monitor import LogEvent, LogEventMonitor +from goth.runner.probe.component import ProbeComponent + +if TYPE_CHECKING: + from goth.runner.probe import Probe + +logger = logging.getLogger(__name__) + + +class AgentComponent(ProbeComponent, abc.ABC): + """Probe component responsible for managing an agent binary. + + The agent can be an arbitrary binary executed in the container or on host. + This class also includes logic for monitoring the agent logs and writing + them to a file. + """ + + log_monitor: LogEventMonitor + """Monitor and buffer for agent logs, enables asserting for certain lines to be + present in the log buffer. + """ + + @property + @abc.abstractmethod + def name(self) -> str: + """Name of this agent component to be used when creating its log file.""" + + async def start(self, *args, **kwargs): + """Start the agent binary and initialize the internal log monitor.""" + self._init_log_monitor() + + async def stop(self, *args, **kwargs): + """Stop the agent binary and its log monitor.""" + if self.log_monitor: + await self.log_monitor.stop() + + def _init_log_monitor(self): + probe = self.probe + log_config = LogConfig(file_name=self.name) + + if probe.container.log_config: + log_config.base_dir = probe.container.log_config.base_dir + + self.log_monitor = LogEventMonitor(self.name, log_config) + self._last_checked_line = -1 + + async def wait_for_log(self, pattern: str, timeout: float = 1000) -> LogEvent: + """Search agent logs for a log line with the message matching `pattern`.""" + entry = await self.log_monitor.wait_for_entry(pattern, timeout) + return entry + + +class ProviderAgentComponent(AgentComponent): + """Probe component which runs `ya-provider` in the probe's container.""" + + agent_preset: Optional[str] + """Name of the preset to be used when placing a market offer.""" + + subnet: str + """Name of the subnet to which the provider agent connects.""" + + @property + def name(self) -> str: + """Name of this agent component to be used when creating its log file.""" + return "{self.probe.name}_ya-provider_agent" + + def __init__(self, probe: "Probe", subnet: str, agent_preset: Optional[str] = None): + super().__init__(probe) + self.agent_preset = agent_preset + self.subnet = subnet + + async def start(self): + """Start the provider agent and attach to its log stream.""" + await super().start() + probe = self.probe + probe._logger.info("Starting ya-provider") + + if self.agent_preset: + probe.container.exec_run(f"ya-provider preset activate {self.agent_preset}") + probe.container.exec_run(f"ya-provider config set --subnet {self.subnet}") + + log_stream = probe.container.exec_run( + f"ya-provider run" f" --app-key {probe.app_key} --node-name {probe.name}", + stream=True, + ) + self.log_monitor.start(log_stream.output) diff --git a/goth/runner/probe/component.py b/goth/runner/probe/component.py new file mode 100644 index 000000000..75825329f --- /dev/null +++ b/goth/runner/probe/component.py @@ -0,0 +1,21 @@ +"""Module containing the base class for probe components.""" + +import abc +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from goth.runner.probe import Probe + + +class ProbeComponent(abc.ABC): + """Base class for a probe component. + + This serves as a common interface for all classes which can be part of the `Probe` + class (as in: composition over inheritance). + """ + + def __init__(self, probe: "Probe"): + self.probe = probe + + probe: "Probe" + """Probe instance containing this component.""" diff --git a/goth/runner/probe/rest_client.py b/goth/runner/probe/rest_client.py index a46629b5c..bde656eeb 100644 --- a/goth/runner/probe/rest_client.py +++ b/goth/runner/probe/rest_client.py @@ -1,7 +1,7 @@ """Module containing classes related to the yagna REST API client.""" import dataclasses import logging -from typing import TypeVar +from typing import TypeVar, TYPE_CHECKING from typing_extensions import Protocol @@ -15,6 +15,10 @@ MARKET_API_URL, PAYMENT_API_URL, ) +from goth.runner.probe.component import ProbeComponent + +if TYPE_CHECKING: + from goth.runner.probe import Probe logger = logging.getLogger(__name__) @@ -60,22 +64,20 @@ def ApiClient(self, conf: ConfTVar) -> ClientTVar: pass -class RestApiComponent: - """Component with clients for all three yagna REST APIs.""" +class RestApiComponent(ProbeComponent): + """Component with clients for yagna REST APIs.""" activity: ActivityApiClient - """Activity API client for the requestor daemon.""" + """Activity API client.""" market: ya_market.RequestorApi - """Market API client for the requestor daemon.""" + """Market API client.""" payment: ya_payment.RequestorApi - """Payment API client for the requestor daemon.""" - - _app_key: str + """Payment API client.""" - def __init__(self, base_hostname: str, app_key: str): - self._app_key = app_key + def __init__(self, probe: "Probe", base_hostname: str): + super().__init__(probe) self._init_activity_api(base_hostname) self._init_payment_api(base_hostname) self._init_market_api(base_hostname) @@ -85,7 +87,9 @@ def _create_api_client( ) -> ClientTVar: api_url = ensure_no_trailing_slash(str(api_url)) config: ConfTVar = api_module.Configuration(api_url) - config.access_token = self._app_key + if not self.probe.app_key: + raise RuntimeError("No app key found. probe=%s", self.probe.name) + config.access_token = self.probe.app_key return api_module.ApiClient(config) def _init_activity_api(self, api_base_host: str) -> None: diff --git a/goth/runner/provider.py b/goth/runner/provider.py index 0795cb39d..650a3235d 100644 --- a/goth/runner/provider.py +++ b/goth/runner/provider.py @@ -10,27 +10,27 @@ class ProviderProbeWithLogSteps(ProviderProbe): @step() async def wait_for_offer_subscribed(self): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Subscribed offer") + await self.agents[0].wait_for_log("Subscribed offer") @step() async def wait_for_proposal_accepted(self): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Decided to CounterProposal") + await self.agents[0].wait_for_log("Decided to CounterProposal") @step() async def wait_for_agreement_approved(self): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Decided to ApproveAgreement") + await self.agents[0].wait_for_log("Decided to ApproveAgreement") @step() async def wait_for_exeunit_started(self): """Wait until the provider agent starts the exe-unit.""" - await self._wait_for_agent_log(r"(.*)\[ExeUnit\](.+)Supervisor initialized$") + await self.agents[0].wait_for_log(r"(.*)\[ExeUnit\](.+)Supervisor initialized$") @step() async def wait_for_exeunit_finished(self): """Wait until exe-unit finishes.""" - await self._wait_for_agent_log( + await self.agents[0].wait_for_log( "ExeUnit process exited with status Finished - exit code: 0" ) @@ -44,7 +44,7 @@ async def wait_for_agreement_terminated(self): versions of API without `terminate` endpoint implemented. Moreover Provider can terminate, because Agreements condition where broken. """ - await self._wait_for_agent_log(r"Agreement \[.*\] terminated by") + await self.agents[0].wait_for_log(r"Agreement \[.*\] terminated by") @step() async def wait_for_agreement_cleanup(self): @@ -52,14 +52,14 @@ async def wait_for_agreement_cleanup(self): This can happen before or after Agreement terminated log will be printed. """ - await self._wait_for_agent_log(r"Agreement \[.*\] cleanup finished.") + await self.agents[0].wait_for_log(r"Agreement \[.*\] cleanup finished.") @step() async def wait_for_invoice_sent(self): """Wait until the invoice is sent.""" - await self._wait_for_agent_log("Invoice (.+) sent") + await self.agents[0].wait_for_log("Invoice (.+) sent") @step(default_timeout=300) async def wait_for_invoice_paid(self): """Wait until the invoice is paid.""" - await self._wait_for_agent_log("Invoice .+? for agreement .+? was paid") + await self.agents[0].wait_for_log("Invoice .+? for agreement .+? was paid") From 1b50616f5857838be2f949b4dbe683f6fc230493 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Wed, 24 Mar 2021 17:17:38 +0100 Subject: [PATCH 03/13] Move daemon address setup to RestApiComponent --- goth/runner/probe/__init__.py | 7 +------ goth/runner/probe/rest_client.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 75a79dd63..98a38ca8d 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -12,7 +12,6 @@ from goth.address import ( YAGNA_BUS_URL, - YAGNA_REST_PORT, YAGNA_REST_URL, ) @@ -147,11 +146,7 @@ async def start(self) -> None: """ await self._start_container() - - host_port = self.container.ports[YAGNA_REST_PORT] - proxy_ip = "127.0.0.1" # use the host-mapped proxy port - api_base_host = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port) - self.api = RestApiComponent(self, api_base_host) + self.api = RestApiComponent(self) async def stop(self): """ diff --git a/goth/runner/probe/rest_client.py b/goth/runner/probe/rest_client.py index bde656eeb..7ccfe72a2 100644 --- a/goth/runner/probe/rest_client.py +++ b/goth/runner/probe/rest_client.py @@ -14,6 +14,8 @@ ACTIVITY_API_URL, MARKET_API_URL, PAYMENT_API_URL, + YAGNA_REST_PORT, + YAGNA_REST_URL, ) from goth.runner.probe.component import ProbeComponent @@ -76,8 +78,15 @@ class RestApiComponent(ProbeComponent): payment: ya_payment.RequestorApi """Payment API client.""" - def __init__(self, probe: "Probe", base_hostname: str): + def __init__(self, probe: "Probe"): super().__init__(probe) + + # We reach the daemon through MITM proxy running on localhost using the + # container's unique port mapping + host_port = probe.container.ports[YAGNA_REST_PORT] + proxy_ip = "127.0.0.1" + base_hostname = YAGNA_REST_URL.substitute(host=proxy_ip, port=host_port) + self._init_activity_api(base_hostname) self._init_payment_api(base_hostname) self._init_market_api(base_hostname) From 4a4c94b1bf1d8f1117710808fd41abd97b2ac3fd Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Wed, 24 Mar 2021 18:09:04 +0100 Subject: [PATCH 04/13] Use a set for storing probe agents --- goth/runner/probe/__init__.py | 10 +++---- goth/runner/probe/agent.py | 29 ++++++++++++++------- test/goth/runner/probe/test_agent.py | 39 ++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 15 deletions(-) create mode 100644 test/goth/runner/probe/test_agent.py diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 98a38ca8d..55796ae1f 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -6,7 +6,7 @@ import copy import logging from pathlib import Path -from typing import AsyncIterator, Dict, Iterator, List, Optional, TYPE_CHECKING +from typing import AsyncIterator, Dict, Iterator, Optional, Set, TYPE_CHECKING from docker import DockerClient @@ -54,8 +54,8 @@ class Probe(abc.ABC): in subclasses (see `ProviderProbe` and `RequestorProbe`). """ - agents: List[AgentComponent] - """List of agent components that will be started as part of this probe.""" + agents: Set[AgentComponent] + """Set of agent components that will be started as part of this probe.""" api: RestApiComponent """Component with clients for all three yagna REST APIs.""" @@ -92,7 +92,7 @@ def __init__( config: YagnaContainerConfig, log_config: LogConfig, ): - self.agents = [] + self.agents = set() self.runner = runner self._docker_client = client self._logger = ProbeLoggingAdapter( @@ -332,4 +332,4 @@ def __init__( subnet: str = DEFAULT_SUBNET, ): super().__init__(runner, client, config, log_config) - self.agents.append(ProviderAgentComponent(self, subnet, agent_preset)) + self.agents.add(ProviderAgentComponent(self, subnet, agent_preset)) diff --git a/goth/runner/probe/agent.py b/goth/runner/probe/agent.py index 66e104415..0515acb27 100644 --- a/goth/runner/probe/agent.py +++ b/goth/runner/probe/agent.py @@ -26,10 +26,16 @@ class AgentComponent(ProbeComponent, abc.ABC): present in the log buffer. """ - @property - @abc.abstractmethod - def name(self) -> str: - """Name of this agent component to be used when creating its log file.""" + name: str + """Name of this agent to be used when creating its log file. + This should be unique when compared to other agents running as part of a + single test. A good example includes both the probe's name, as well as the + type of the agent itself, e.g.: `provider_1_ya-provider`. + """ + + def __init__(self, probe: "Probe", name: str): + super().__init__(probe) + self.name = name async def start(self, *args, **kwargs): """Start the agent binary and initialize the internal log monitor.""" @@ -55,6 +61,14 @@ async def wait_for_log(self, pattern: str, timeout: float = 1000) -> LogEvent: entry = await self.log_monitor.wait_for_entry(pattern, timeout) return entry + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + if type(self) is type(other): + return self.__hash__() == other.__hash__() + return False + class ProviderAgentComponent(AgentComponent): """Probe component which runs `ya-provider` in the probe's container.""" @@ -65,13 +79,8 @@ class ProviderAgentComponent(AgentComponent): subnet: str """Name of the subnet to which the provider agent connects.""" - @property - def name(self) -> str: - """Name of this agent component to be used when creating its log file.""" - return "{self.probe.name}_ya-provider_agent" - def __init__(self, probe: "Probe", subnet: str, agent_preset: Optional[str] = None): - super().__init__(probe) + super().__init__(probe, f"{probe.name}_ya-provider") self.agent_preset = agent_preset self.subnet = subnet diff --git a/test/goth/runner/probe/test_agent.py b/test/goth/runner/probe/test_agent.py new file mode 100644 index 000000000..1e56de158 --- /dev/null +++ b/test/goth/runner/probe/test_agent.py @@ -0,0 +1,39 @@ +"""Unit tests for probe agents.""" + +from unittest.mock import MagicMock + +from goth.runner.probe import Probe +from goth.runner.probe.agent import AgentComponent + + +def test_agent_duplicate_names(): + """Test if two agent objects with the same name will be treated as equal.""" + + class TestAgentComponent(AgentComponent): + pass + + agents = set() + agent_name = "test_agent" + probe = MagicMock(spec=Probe) + + first_agent = TestAgentComponent(probe, agent_name) + agents.add(first_agent) + agents.add(TestAgentComponent(probe, agent_name)) + + assert len(agents) == 1 + assert agents.pop() == first_agent + + +def test_agent_different_names(): + """Test if two agent objects with different names will be treated as different.""" + + class TestAgentComponent(AgentComponent): + pass + + agents = set() + probe = MagicMock(spec=Probe) + + agents.add(TestAgentComponent(probe, "agent_1")) + agents.add(TestAgentComponent(probe, "agent_2")) + + assert len(agents) == 2 From fe4f9d1715522e1e39ec18f41a3642940af8c82c Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Wed, 24 Mar 2021 19:15:27 +0100 Subject: [PATCH 05/13] Reduce probe class hierarchy --- goth/default-assets/goth-config.yml | 2 +- goth/interactive.py | 10 +- goth/runner/probe/__init__.py | 23 +- goth/runner/probe/{steps.py => mixin.py} | 201 ++++++++------ goth/runner/provider.py | 65 ----- goth/runner/requestor.py | 246 ------------------ test/yagna/e2e/test_e2e_vm.py | 13 +- test/yagna/e2e/test_e2e_wasm.py | 13 +- test/yagna/helpers/negotiation.py | 12 +- test/yagna/helpers/payment.py | 7 +- test/yagna/interactive/test_interactive_vm.py | 12 +- .../module/payments/test_zero_amount_txs.py | 11 +- .../test_provider_multi_activity.py | 11 +- 13 files changed, 171 insertions(+), 455 deletions(-) rename goth/runner/probe/{steps.py => mixin.py} (72%) delete mode 100644 goth/runner/provider.py delete mode 100644 goth/runner/requestor.py diff --git a/goth/default-assets/goth-config.yml b/goth/default-assets/goth-config.yml index b976d6edc..523b4a320 100644 --- a/goth/default-assets/goth-config.yml +++ b/goth/default-assets/goth-config.yml @@ -38,7 +38,7 @@ node-types: class: "goth.runner.probe.RequestorProbe" - name: "VM-Wasm-Provider" - class: "goth.runner.provider.ProviderProbeWithLogSteps" + class: "goth.runner.probe.ProviderProbe" mount: - read-only: "provider/presets.json" destination: "/root/.local/share/ya-provider/presets.json" diff --git a/goth/interactive.py b/goth/interactive.py index 12d22ccac..871abb8c4 100644 --- a/goth/interactive.py +++ b/goth/interactive.py @@ -6,8 +6,7 @@ from goth.configuration import Configuration from goth.runner import Runner -from goth.runner.probe import RequestorProbe -from goth.runner.provider import ProviderProbeWithLogSteps +from goth.runner.probe import ProviderProbe, RequestorProbe logger = logging.getLogger(__name__) @@ -30,7 +29,7 @@ async def start_network( async with runner(configuration.containers): - providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) + providers = runner.get_probes(probe_type=ProviderProbe) requestor = runner.get_probes(probe_type=RequestorProbe)[0] # Some test steps may be included in the interactive test as well @@ -41,9 +40,8 @@ async def start_network( env = {"PATH": "$PATH"} requestor.set_agent_env_vars(env) env_vars = " ".join([f"{key}={val}" for key, val in env.items()]) - print( - f"$ {env_vars} examples/blender/blender.py --subnet {providers[0].subnet}" - ) + subnet = providers[0].provider_agent.subnet + print(f"$ {env_vars} examples/blender/blender.py --subnet {subnet}") print("\nPress Ctrl+C at any moment to stop the test harness.\033[0m\n") diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 55796ae1f..7794897f9 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -27,7 +27,12 @@ from goth.runner.exceptions import KeyAlreadyExistsError from goth.runner.log import LogConfig from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent -from goth.runner.probe.steps import RequestorApiMixin +from goth.runner.probe.mixin import ( + ActivityApiMixin, + MarketApiMixin, + PaymentApiMixin, + ProviderLogMixin, +) from goth.runner.probe.rest_client import RestApiComponent from goth.runner.process import run_command @@ -140,10 +145,7 @@ def _setup_gftp_proxy(self, config: YagnaContainerConfig) -> YagnaContainerConfi return new_config async def start(self) -> None: - """Start the probe. - - This method is extended in subclasses and mixins. - """ + """Start the probe.""" await self._start_container() self.api = RestApiComponent(self) @@ -305,7 +307,7 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]: await probe.stop() -class RequestorProbe(RequestorApiMixin, Probe): +class RequestorProbe(ActivityApiMixin, MarketApiMixin, PaymentApiMixin, Probe): """A requestor probe that can make calls to Yagna REST APIs. This class is used in Level 1 scenarios and as a type of `self` @@ -319,9 +321,13 @@ async def _start_container(self) -> None: self.cli.payment_init(sender_mode=True) -class ProviderProbe(Probe): +class ProviderProbe(MarketApiMixin, PaymentApiMixin, ProviderLogMixin, Probe): """A probe subclass that can run a provider agent.""" + provider_agent: ProviderAgentComponent + """The agent component running `ya-provider` for this probe. + This field is added for convenience to make getting this agent instance easier.""" + def __init__( self, runner: "Runner", @@ -332,4 +338,5 @@ def __init__( subnet: str = DEFAULT_SUBNET, ): super().__init__(runner, client, config, log_config) - self.agents.add(ProviderAgentComponent(self, subnet, agent_preset)) + self.provider_agent = ProviderAgentComponent(self, subnet, agent_preset) + self.agents.add(self.provider_agent) diff --git a/goth/runner/probe/steps.py b/goth/runner/probe/mixin.py similarity index 72% rename from goth/runner/probe/steps.py rename to goth/runner/probe/mixin.py index c2b109a64..ae007b3e0 100644 --- a/goth/runner/probe/steps.py +++ b/goth/runner/probe/mixin.py @@ -9,6 +9,7 @@ List, Optional, Protocol, + Set, Sequence, Tuple, TYPE_CHECKING, @@ -23,6 +24,7 @@ if TYPE_CHECKING: from goth.runner.probe import Probe + from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent from goth.runner.probe.rest_client import RestApiComponent logger = logging.getLogger(__name__) @@ -34,23 +36,29 @@ class ProbeProtocol(Protocol): This is mainly to fix mypy errors when using `Probe` directly as `self` type. """ + agents: "Set[AgentComponent]" + """Set of agent components that will be started as part of a probe.""" + api: "RestApiComponent" """REST API probe component.""" + name: str + """Name of the probe container.""" + -class RequestorApiMixin: - """Provides high-level steps that rely on Yagna Activity API.""" +class ActivityApiMixin: + """Probe mixin providing high-level test steps which use yagna activity API.""" @step() async def create_activity(self: ProbeProtocol, agreement_id: str) -> str: - """Call create_activity on the requestor activity api.""" + """Call create_activity on the activity api.""" activity_id = await self.api.activity.control.create_activity(agreement_id) return activity_id @step() async def call_exec(self: ProbeProtocol, activity_id: str, exe_script: str) -> str: - """Call call_exec on the requestor activity api.""" + """Call call_exec on the activity api.""" script_request = ExeScriptRequest(exe_script) batch_id = await self.api.activity.control.call_exec( @@ -62,7 +70,7 @@ async def call_exec(self: ProbeProtocol, activity_id: str, exe_script: str) -> s async def collect_results( self: ProbeProtocol, activity_id: str, batch_id: str, num_results: int ) -> List[ExeScriptCommandResult]: - """Call collect_results on the requestor activity api.""" + """Call collect_results on the activity api.""" results: List[ExeScriptCommandResult] = [] @@ -75,15 +83,63 @@ async def collect_results( @step() async def destroy_activity(self: ProbeProtocol, activity_id: str) -> None: - """Call destroy_activity on the requestor activity api.""" + """Call destroy_activity on the activity api.""" await self.api.activity.control.destroy_activity(activity_id) + +class MarketApiMixin: + """Probe mixin providing high-level test steps which use yagna market API.""" + + @step() + async def confirm_agreement(self: ProbeProtocol, agreement_id: str) -> None: + """Call confirm_agreement on the market api.""" + await self.api.market.confirm_agreement(agreement_id) + + @step() + async def counter_proposal( + self: ProbeProtocol, + subscription_id: str, + demand: Demand, + provider_proposal: Proposal, + ) -> str: + """Call counter_proposal_demand on the market api.""" + + proposal = DemandOfferBase( + constraints=demand.constraints, + properties=demand.properties, + ) + + counter_proposal = await self.api.market.counter_proposal_demand( + subscription_id=subscription_id, + proposal_id=provider_proposal.proposal_id, + demand_offer_base=proposal, + ) + + return counter_proposal + + @step() + async def create_agreement(self: ProbeProtocol, proposal: Proposal) -> str: + """Call create_agreement on the market api.""" + + valid_to = str(datetime.utcnow() + timedelta(days=1)) + "Z" + logger.debug( + "Creating agreement, proposal_id=%s, valid_to=%s", + proposal.proposal_id, + valid_to, + ) + agreement_proposal = AgreementProposal( + proposal_id=proposal.proposal_id, valid_to=valid_to + ) + + agreement_id = await self.api.market.create_agreement(agreement_proposal) + return agreement_id + @step() async def subscribe_demand( self: ProbeProtocol, demand: Demand ) -> Tuple[str, Demand]: - """Call subscribe demand on the requestor market api.""" + """Call subscribe demand on the market api.""" subscription_id = await self.api.market.subscribe_demand(demand) return subscription_id, demand @@ -107,11 +163,25 @@ async def subscribe_template_demand( return await self.subscribe_demand(demand) # type: ignore + @step() + async def terminate_agreement( + self: ProbeProtocol, agreement_id: str, reason: Optional[str] + ): + """Call terminate_agreement on the market api.""" + await self.api.market.terminate_agreement( + agreement_id, request_body={"message": f"Terminated by {self.name}"} + ) + @step() async def unsubscribe_demand(self: ProbeProtocol, subscription_id: str) -> None: - """Call unsubscribe demand on the requestor market api.""" + """Call unsubscribe demand on the market api.""" await self.api.market.unsubscribe_demand(subscription_id) + @step() + async def wait_for_approval(self: ProbeProtocol, agreement_id: str) -> None: + """Call wait_for_approval on the market api.""" + await self.api.market.wait_for_approval(agreement_id) + @step() async def wait_for_proposals( self: ProbeProtocol, @@ -119,7 +189,7 @@ async def wait_for_proposals( providers: Sequence["Probe"], filter: Optional[Callable[[Proposal], bool]] = lambda p: True, ) -> List[Proposal]: - """Call collect_offers on the requestor market api. + """Call collect_offers on the market api. Polls collect_offers continously until an offer from each of the given providers is received. Returns a list of the collected proposals. @@ -151,67 +221,13 @@ async def wait_for_proposals( return proposals - @step() - async def counter_proposal( - self: ProbeProtocol, - subscription_id: str, - demand: Demand, - provider_proposal: Proposal, - ) -> str: - """Call counter_proposal_demand on the requestor market api.""" - - proposal = DemandOfferBase( - constraints=demand.constraints, - properties=demand.properties, - ) - - counter_proposal = await self.api.market.counter_proposal_demand( - subscription_id=subscription_id, - proposal_id=provider_proposal.proposal_id, - demand_offer_base=proposal, - ) - - return counter_proposal - - @step() - async def create_agreement(self: ProbeProtocol, proposal: Proposal) -> str: - """Call create_agreement on the requestor market api.""" - - valid_to = str(datetime.utcnow() + timedelta(days=1)) + "Z" - logger.debug( - "Creating agreement, proposal_id=%s, valid_to=%s", - proposal.proposal_id, - valid_to, - ) - agreement_proposal = AgreementProposal( - proposal_id=proposal.proposal_id, valid_to=valid_to - ) - - agreement_id = await self.api.market.create_agreement(agreement_proposal) - return agreement_id - - @step() - async def confirm_agreement(self: ProbeProtocol, agreement_id: str) -> None: - """Call confirm_agreement on the requestor market api.""" - await self.api.market.confirm_agreement(agreement_id) - - @step() - async def wait_for_approval(self: ProbeProtocol, agreement_id: str) -> None: - """Call wait_for_approval on the requestor market api.""" - await self.api.market.wait_for_approval(agreement_id) - @step() - async def terminate_agreement( - self: ProbeProtocol, agreement_id: str, reason: Optional[str] - ): - """Call terminate_agreement on the requestor market api.""" - await self.api.market.terminate_agreement( - agreement_id, request_body={"message": "Terminated by requestor"} - ) +class PaymentApiMixin: + """Probe mixin providing high-level test steps which use yagna payment API.""" @step() async def gather_invoices(self: ProbeProtocol, agreement_id: str) -> List[Invoice]: - """Call gather_invoice on the requestor payment api.""" + """Call gather_invoice on the payment api.""" invoices: List[Invoice] = [] @@ -226,7 +242,7 @@ async def gather_invoices(self: ProbeProtocol, agreement_id: str) -> List[Invoic async def pay_invoices( self: ProbeProtocol, invoice_events: Iterable[Invoice] ) -> None: - """Call accept_invoice on the requestor payment api.""" + """Call accept_invoice on the payment api.""" for invoice_event in invoice_events: allocation = Allocation( @@ -247,38 +263,51 @@ async def pay_invoices( logger.debug("Accepted invoice. id=%s", invoice_event.invoice_id) +class ProviderProbeProtocol(ProbeProtocol, Protocol): + """Protocol class representing `ProviderProbe` class. + + This is mainly to fix mypy errors when used as `self` in mixins. + """ + + provider_agent: "ProviderAgentComponent" + """The agent component running `ya-provider` for this probe. + This field is added for convenience to make getting this agent instance easier.""" + + class ProviderLogMixin: - """A provider probe with steps that wait for specific messages in agent logs.""" + """Provider probe mixin which adds step functions that wait for specific logs.""" @step() - async def wait_for_offer_subscribed(self): + async def wait_for_offer_subscribed(self: ProviderProbeProtocol): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Subscribed offer") + await self.provider_agent.wait_for_log("Subscribed offer") @step() - async def wait_for_proposal_accepted(self): + async def wait_for_proposal_accepted(self: ProviderProbeProtocol): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Decided to CounterProposal") + await self.provider_agent.wait_for_log("Decided to CounterProposal") @step() - async def wait_for_agreement_approved(self): + async def wait_for_agreement_approved(self: ProviderProbeProtocol): """Wait until the provider agent subscribes to the offer.""" - await self._wait_for_agent_log("Decided to ApproveAgreement") + await self.provider_agent.wait_for_log("Decided to ApproveAgreement") @step() - async def wait_for_exeunit_started(self): + async def wait_for_exeunit_started(self: ProviderProbeProtocol): """Wait until the provider agent starts the exe-unit.""" - await self._wait_for_agent_log(r"(.*)\[ExeUnit\](.+)Supervisor initialized$") + await self.provider_agent.wait_for_log( + r"(.*)\[ExeUnit\](.+)Supervisor initialized$" + ) @step() - async def wait_for_exeunit_finished(self): + async def wait_for_exeunit_finished(self: ProviderProbeProtocol): """Wait until exe-unit finishes.""" - await self._wait_for_agent_log( + await self.provider_agent.wait_for_log( "ExeUnit process exited with status Finished - exit code: 0" ) @step() - async def wait_for_agreement_terminated(self): + async def wait_for_agreement_terminated(self: ProviderProbeProtocol): """Wait until Agreement will be terminated. This can happen for 2 reasons (both caught by this function): @@ -287,22 +316,22 @@ async def wait_for_agreement_terminated(self): versions of API without `terminate` endpoint implemented. Moreover Provider can terminate, because Agreements condition where broken. """ - await self._wait_for_agent_log(r"Agreement \[.*\] terminated by") + await self.provider_agent.wait_for_log(r"Agreement \[.*\] terminated by") @step() - async def wait_for_agreement_cleanup(self): + async def wait_for_agreement_cleanup(self: ProviderProbeProtocol): """Wait until Provider will cleanup all allocated resources. This can happen before or after Agreement terminated log will be printed. """ - await self._wait_for_agent_log(r"Agreement \[.*\] cleanup finished.") + await self.provider_agent.wait_for_log(r"Agreement \[.*\] cleanup finished.") @step() - async def wait_for_invoice_sent(self): + async def wait_for_invoice_sent(self: ProviderProbeProtocol): """Wait until the invoice is sent.""" - await self._wait_for_agent_log("Invoice (.+) sent") + await self.provider_agent.wait_for_log("Invoice (.+) sent") @step(default_timeout=300) - async def wait_for_invoice_paid(self): + async def wait_for_invoice_paid(self: ProviderProbeProtocol): """Wait until the invoice is paid.""" - await self._wait_for_agent_log("Invoice .+? for agreement .+? was paid") + await self.provider_agent.wait_for_log("Invoice .+? for agreement .+? was paid") diff --git a/goth/runner/provider.py b/goth/runner/provider.py deleted file mode 100644 index 650a3235d..000000000 --- a/goth/runner/provider.py +++ /dev/null @@ -1,65 +0,0 @@ -"""`ProviderProbe` subclasses for controlling provider nodes.""" - -from goth.runner import step -from goth.runner.probe import ProviderProbe - - -class ProviderProbeWithLogSteps(ProviderProbe): - """A provider probe with steps that wait for specific messages in agent logs.""" - - @step() - async def wait_for_offer_subscribed(self): - """Wait until the provider agent subscribes to the offer.""" - await self.agents[0].wait_for_log("Subscribed offer") - - @step() - async def wait_for_proposal_accepted(self): - """Wait until the provider agent subscribes to the offer.""" - await self.agents[0].wait_for_log("Decided to CounterProposal") - - @step() - async def wait_for_agreement_approved(self): - """Wait until the provider agent subscribes to the offer.""" - await self.agents[0].wait_for_log("Decided to ApproveAgreement") - - @step() - async def wait_for_exeunit_started(self): - """Wait until the provider agent starts the exe-unit.""" - await self.agents[0].wait_for_log(r"(.*)\[ExeUnit\](.+)Supervisor initialized$") - - @step() - async def wait_for_exeunit_finished(self): - """Wait until exe-unit finishes.""" - await self.agents[0].wait_for_log( - "ExeUnit process exited with status Finished - exit code: 0" - ) - - @step() - async def wait_for_agreement_terminated(self): - """Wait until Agreement will be terminated. - - This can happen for 2 reasons (both caught by this function): - - Requestor terminates - most common case - - Provider terminates - it happens for compatibility with previous - versions of API without `terminate` endpoint implemented. Moreover - Provider can terminate, because Agreements condition where broken. - """ - await self.agents[0].wait_for_log(r"Agreement \[.*\] terminated by") - - @step() - async def wait_for_agreement_cleanup(self): - """Wait until Provider will cleanup all allocated resources. - - This can happen before or after Agreement terminated log will be printed. - """ - await self.agents[0].wait_for_log(r"Agreement \[.*\] cleanup finished.") - - @step() - async def wait_for_invoice_sent(self): - """Wait until the invoice is sent.""" - await self.agents[0].wait_for_log("Invoice (.+) sent") - - @step(default_timeout=300) - async def wait_for_invoice_paid(self): - """Wait until the invoice is paid.""" - await self.agents[0].wait_for_log("Invoice .+? for agreement .+? was paid") diff --git a/goth/runner/requestor.py b/goth/runner/requestor.py deleted file mode 100644 index f68a64ab0..000000000 --- a/goth/runner/requestor.py +++ /dev/null @@ -1,246 +0,0 @@ -"""`RequestorProbe` subclasses for controlling requestor nodes.""" - -import asyncio -from datetime import datetime, timedelta -import logging -from typing import Callable, Iterable, List, Optional, Sequence, Tuple - -from ya_activity import ExeScriptCommandResult, ExeScriptRequest -from ya_market import AgreementProposal, Demand, DemandOfferBase, Proposal -from ya_payment import Acceptance, Allocation, Invoice - -from goth.node import DEFAULT_SUBNET -from goth.runner import step -from goth.runner.probe import Probe, RequestorProbe - - -logger = logging.getLogger(__name__) - - -class ActivityOperationsMixin: - """Provides high-level steps that rely on Yagna Activity API.""" - - @step() - async def create_activity(self: RequestorProbe, agreement_id: str) -> str: - """Call create_activity on the requestor activity api.""" - - activity_id = await self.activity.control.create_activity(agreement_id) - return activity_id - - @step() - async def call_exec(self: RequestorProbe, activity_id: str, exe_script: str) -> str: - """Call call_exec on the requestor activity api.""" - - script_request = ExeScriptRequest(exe_script) - batch_id = await self.activity.control.call_exec(activity_id, script_request) - return batch_id - - @step() - async def collect_results( - self: RequestorProbe, activity_id: str, batch_id: str, num_results: int - ) -> List[ExeScriptCommandResult]: - """Call collect_results on the requestor activity api.""" - - results: List[ExeScriptCommandResult] = [] - - while len(results) < num_results: - results = await self.activity.control.get_exec_batch_results( - activity_id, batch_id - ) - await asyncio.sleep(1.0) - return results - - @step() - async def destroy_activity(self: RequestorProbe, activity_id: str) -> None: - """Call destroy_activity on the requestor activity api.""" - - await self.activity.control.destroy_activity(activity_id) - - -class MarketOperationsMixin: - """Provides high-level steps that rely on Yagna Market API.""" - - @step() - async def subscribe_demand( - self: RequestorProbe, demand: Demand - ) -> Tuple[str, Demand]: - """Call subscribe demand on the requestor market api.""" - subscription_id = await self.market.subscribe_demand(demand) - return subscription_id, demand - - @step() - async def subscribe_template_demand( - self: RequestorProbe, task_package: str, constraints: str - ) -> Tuple[str, Demand]: - """Build Demand from template and call subscribe demand on market api.""" - - demand = DemandOfferBase( - properties={ - "golem.node.id.name": "test1", - "golem.srv.comp.expiration": int( - (datetime.now() + timedelta(minutes=10)).timestamp() * 1000 - ), - "golem.srv.comp.task_package": task_package, - "golem.node.debug.subnet": DEFAULT_SUBNET, - }, - constraints=constraints, - ) - - return await self.subscribe_demand(demand) - - @step() - async def unsubscribe_demand(self: RequestorProbe, subscription_id: str) -> None: - """Call unsubscribe demand on the requestor market api.""" - await self.market.unsubscribe_demand(subscription_id) - - @step() - async def wait_for_proposals( - self: RequestorProbe, - subscription_id: str, - providers: Sequence[Probe], - filter: Optional[Callable[[Proposal], bool]] = lambda p: True, - ) -> List[Proposal]: - """Call collect_offers on the requestor market api. - - Polls collect_offers continously until an offer from each of the given - providers is received. Returns a list of the collected proposals. - """ - proposals: List[Proposal] = [] - provider_ids = {p.address for p in providers} - - while len(proposals) < len(provider_ids): - collected_offers = await self.market.collect_offers(subscription_id) - if collected_offers: - logger.debug( - "collect_offers(%s). collected_offers=%r", - subscription_id, - collected_offers, - ) - collected_proposals = [ - offer.proposal - for offer in collected_offers - if ( - offer.proposal.issuer_id in provider_ids - and filter(offer.proposal) - ) - ] - proposals.extend(collected_proposals) - else: - logger.debug( - "Waiting for proposals. subscription_id=%s", subscription_id - ) - - return proposals - - @step() - async def counter_proposal( - self: RequestorProbe, - subscription_id: str, - demand: Demand, - provider_proposal: Proposal, - ) -> str: - """Call counter_proposal_demand on the requestor market api.""" - - proposal = DemandOfferBase( - constraints=demand.constraints, - properties=demand.properties, - ) - - counter_proposal = await self.market.counter_proposal_demand( - subscription_id=subscription_id, - proposal_id=provider_proposal.proposal_id, - demand_offer_base=proposal, - ) - - return counter_proposal - - @step() - async def create_agreement(self: RequestorProbe, proposal: Proposal) -> str: - """Call create_agreement on the requestor market api.""" - - valid_to = str(datetime.utcnow() + timedelta(days=1)) + "Z" - logger.debug( - "Creating agreement, proposal_id=%s, valid_to=%s", - proposal.proposal_id, - valid_to, - ) - agreement_proposal = AgreementProposal( - proposal_id=proposal.proposal_id, valid_to=valid_to - ) - - agreement_id = await self.market.create_agreement(agreement_proposal) - return agreement_id - - @step() - async def confirm_agreement(self: RequestorProbe, agreement_id: str) -> None: - """Call confirm_agreement on the requestor market api.""" - await self.market.confirm_agreement(agreement_id) - - @step() - async def wait_for_approval(self: RequestorProbe, agreement_id: str) -> None: - """Call wait_for_approval on the requestor market api.""" - await self.market.wait_for_approval(agreement_id) - - @step() - async def terminate_agreement( - self: RequestorProbe, agreement_id: str, reason: Optional[str] - ): - """Call terminate_agreement on the requestor market api.""" - await self.market.terminate_agreement( - agreement_id, request_body={"message": "Terminated by requestor"} - ) - - -class PaymentOperationsMixin: - """Provides high-level steps that rely on Yagna Payment API.""" - - @step() - async def gather_invoices(self: RequestorProbe, agreement_id: str) -> List[Invoice]: - """Call gather_invoice on the requestor payment api.""" - - invoices: List[Invoice] = [] - - while not invoices: - await asyncio.sleep(2.0) - invoices = await self.payment.get_invoices() - invoices = [inv for inv in invoices if inv.agreement_id == agreement_id] - - return invoices - - @step() - async def pay_invoices( - self: RequestorProbe, invoice_events: Iterable[Invoice] - ) -> None: - """Call accept_invoice on the requestor payment api.""" - - for invoice_event in invoice_events: - allocation = Allocation( - allocation_id="", - total_amount=invoice_event.amount, - spent_amount=0, - remaining_amount=0, - make_deposit=True, - ) - allocation_result = await self.payment.create_allocation(allocation) - logger.debug("Created allocation. id=%s", allocation_result) - - acceptance = Acceptance( - total_amount_accepted=invoice_event.amount, - allocation_id=allocation_result.allocation_id, - ) - await self.payment.accept_invoice(invoice_event.invoice_id, acceptance) - logger.debug("Accepted invoice. id=%s", invoice_event.invoice_id) - - -class RequestorProbeWithApiSteps( - RequestorProbe, - ActivityOperationsMixin, - MarketOperationsMixin, - PaymentOperationsMixin, -): - """A testing interface for a Yagna requestor node, with all bells and whistles. - - This includes activity, market and payment API clients which can be used to - directly control the requestor daemon, and higher-level steps that use those - clients for making API calls. - """ diff --git a/test/yagna/e2e/test_e2e_vm.py b/test/yagna/e2e/test_e2e_vm.py index 7ea694d62..371a521ac 100644 --- a/test/yagna/e2e/test_e2e_vm.py +++ b/test/yagna/e2e/test_e2e_vm.py @@ -16,8 +16,7 @@ from goth.runner import Runner from goth.runner.container.payment import PaymentIdPool from goth.runner.container.yagna import YagnaContainerConfig -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe logger = logging.getLogger(__name__) @@ -48,21 +47,21 @@ def _topology( return [ YagnaContainerConfig( name="requestor", - probe_type=RequestorProbeWithApiSteps, + probe_type=RequestorProbe, volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), YagnaContainerConfig( name="provider_1", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, volumes=provider_volumes, privileged_mode=True, ), YagnaContainerConfig( name="provider_2", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, volumes=provider_volumes, privileged_mode=True, @@ -128,8 +127,8 @@ async def test_e2e_vm_success( exe_script = _exe_script(runner, output_file) - requestor = runner.get_probes(probe_type=RequestorProbeWithApiSteps)[0] - providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + providers = runner.get_probes(probe_type=ProviderProbe) # Market diff --git a/test/yagna/e2e/test_e2e_wasm.py b/test/yagna/e2e/test_e2e_wasm.py index 3878410c5..201942e7c 100644 --- a/test/yagna/e2e/test_e2e_wasm.py +++ b/test/yagna/e2e/test_e2e_wasm.py @@ -15,8 +15,7 @@ from goth.runner import Runner from goth.runner.container.payment import PaymentIdPool from goth.runner.container.yagna import YagnaContainerConfig -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe logger = logging.getLogger(__name__) @@ -41,14 +40,14 @@ def _topology( return [ YagnaContainerConfig( name="requestor", - probe_type=RequestorProbeWithApiSteps, + probe_type=RequestorProbe, volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), YagnaContainerConfig( name="provider_1", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, # https://github.com/golemfactory/goth/issues/410 privileged_mode=True, @@ -56,7 +55,7 @@ def _topology( ), YagnaContainerConfig( name="provider_2", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, # https://github.com/golemfactory/goth/issues/410 privileged_mode=True, @@ -79,8 +78,8 @@ async def test_e2e_wasm_success( topology = _topology(assets_path, payment_id_pool) async with runner(topology): - requestor = runner.get_probes(probe_type=RequestorProbeWithApiSteps)[0] - providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + providers = runner.get_probes(probe_type=ProviderProbe) # Market diff --git a/test/yagna/helpers/negotiation.py b/test/yagna/helpers/negotiation.py index aab9afa1a..1098372d9 100644 --- a/test/yagna/helpers/negotiation.py +++ b/test/yagna/helpers/negotiation.py @@ -7,9 +7,7 @@ from ya_market import Demand, DemandOfferBase, Proposal from goth.node import DEFAULT_SUBNET -from goth.runner.probe import RequestorProbe -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe logger = logging.getLogger(__name__) @@ -18,7 +16,7 @@ class DemandBuilder: """Helper for building custom Demands. - Use if RequestorProbeWithApiSteps.subscribe_template_demand function + Use if RequestorProbe.subscribe_template_demand function is not enough for you. """ @@ -65,11 +63,11 @@ def build(self) -> DemandOfferBase: async def negotiate_agreements( - requestor: RequestorProbeWithApiSteps, + requestor: RequestorProbe, demand: Demand, - providers: List[ProviderProbeWithLogSteps], + providers: List[ProviderProbe], proposal_filter: Optional[Callable[[Proposal], bool]] = lambda p: True, -) -> List[Tuple[str, ProviderProbeWithLogSteps]]: +) -> List[Tuple[str, ProviderProbe]]: """Negotiate agreements with supplied providers. Use negotiate_agreements function, when you don't need any custom negotiation diff --git a/test/yagna/helpers/payment.py b/test/yagna/helpers/payment.py index a3306fa46..a6a5e8896 100644 --- a/test/yagna/helpers/payment.py +++ b/test/yagna/helpers/payment.py @@ -2,13 +2,12 @@ from typing import List, Tuple -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe async def pay_all( - requestor: RequestorProbeWithApiSteps, - agreements: List[Tuple[str, ProviderProbeWithLogSteps]], + requestor: RequestorProbe, + agreements: List[Tuple[str, ProviderProbe]], ): """Pay for all Agreements.""" for agreement_id, provider in agreements: diff --git a/test/yagna/interactive/test_interactive_vm.py b/test/yagna/interactive/test_interactive_vm.py index 84a85a770..c89a42b1f 100644 --- a/test/yagna/interactive/test_interactive_vm.py +++ b/test/yagna/interactive/test_interactive_vm.py @@ -21,8 +21,7 @@ from goth.runner import Runner from goth.runner.container.payment import PaymentIdPool from goth.runner.container.yagna import YagnaContainerConfig -from goth.runner.probe import RequestorProbe -from goth.runner.provider import ProviderProbeWithLogSteps +from goth.runner.probe import ProviderProbe, RequestorProbe logger = logging.getLogger(__name__) @@ -70,14 +69,14 @@ def _topology( ), YagnaContainerConfig( name="provider_1", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, volumes=provider_volumes, privileged_mode=True, ), YagnaContainerConfig( name="provider_2", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, volumes=provider_volumes, privileged_mode=True, @@ -142,19 +141,20 @@ async def test_interactive_vm( async with runner(topology): - providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) + providers = runner.get_probes(probe_type=ProviderProbe) requestor = runner.get_probes(probe_type=RequestorProbe)[0] # Some test steps may be included in the interactive test as well for provider in providers: await provider.wait_for_offer_subscribed(timeout=10) + subnet = providers[0].provider_agent.subnet print("\n\033[33;1mNow run your requestor agent as follows:\n") print( f"$ YAGNA_APPKEY={requestor.app_key} " f"YAGNA_API_URL=http://{requestor.ip_address}:{YAGNA_REST_PORT} " f"GSB_URL=tcp://{requestor.ip_address}:{YAGNA_BUS_PORT} " - f"examples/blender/blender.py --subnet {providers[0].subnet}" + f"examples/blender/blender.py --subnet {subnet}" ) print("\nPress Ctrl+C at any moment to stop the test harness.\033[0m\n") diff --git a/test/yagna/module/payments/test_zero_amount_txs.py b/test/yagna/module/payments/test_zero_amount_txs.py index 9e1bfb6ab..62aa5c641 100644 --- a/test/yagna/module/payments/test_zero_amount_txs.py +++ b/test/yagna/module/payments/test_zero_amount_txs.py @@ -14,8 +14,7 @@ from goth.runner import Runner from goth.runner.container.payment import PaymentIdPool from goth.runner.container.yagna import YagnaContainerConfig -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe from ya_payment import InvoiceStatus @@ -42,14 +41,14 @@ def _topology( return [ YagnaContainerConfig( name="requestor", - probe_type=RequestorProbeWithApiSteps, + probe_type=RequestorProbe, volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), YagnaContainerConfig( name="provider", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, # https://github.com/golemfactory/goth/issues/410 privileged_mode=True, @@ -72,8 +71,8 @@ async def test_zero_amount_invoice_is_settled( topology = _topology(assets_path, payment_id_pool) async with runner(topology): - requestor = runner.get_probes(probe_type=RequestorProbeWithApiSteps)[0] - provider = runner.get_probes(probe_type=ProviderProbeWithLogSteps)[0] + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + provider = runner.get_probes(probe_type=ProviderProbe)[0] # Market diff --git a/test/yagna/module/ya-provider/test_provider_multi_activity.py b/test/yagna/module/ya-provider/test_provider_multi_activity.py index e78ecb3cd..bbc832028 100644 --- a/test/yagna/module/ya-provider/test_provider_multi_activity.py +++ b/test/yagna/module/ya-provider/test_provider_multi_activity.py @@ -15,8 +15,7 @@ from goth.runner import Runner from goth.runner.container.payment import PaymentIdPool from goth.runner.container.yagna import YagnaContainerConfig -from goth.runner.provider import ProviderProbeWithLogSteps -from goth.runner.requestor import RequestorProbeWithApiSteps +from goth.runner.probe import ProviderProbe, RequestorProbe from test.yagna.helpers.negotiation import negotiate_agreements, DemandBuilder from test.yagna.helpers.payment import pay_all @@ -46,14 +45,14 @@ def _topology( return [ YagnaContainerConfig( name="requestor", - probe_type=RequestorProbeWithApiSteps, + probe_type=RequestorProbe, volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), YagnaContainerConfig( name="provider_1", - probe_type=ProviderProbeWithLogSteps, + probe_type=ProviderProbe, environment=provider_env, payment_id=payment_id_pool.get_id(), volumes=provider_volumes, @@ -77,8 +76,8 @@ async def test_provider_multi_activity( """Test provider handling multiple activities in single Agreement.""" async with runner(_topology(assets_path, payment_id_pool)): - requestor = runner.get_probes(probe_type=RequestorProbeWithApiSteps)[0] - providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + providers = runner.get_probes(probe_type=ProviderProbe) # Market task_package = task_package_template.format( From f0858a416379cfccf9117409e7cfe49eaa372f13 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Fri, 26 Mar 2021 15:33:45 +0100 Subject: [PATCH 06/13] Add more logging to runner exit stack calls --- goth/runner/__init__.py | 3 --- goth/runner/container/compose.py | 2 ++ goth/runner/probe/__init__.py | 5 +++++ goth/runner/proxy.py | 7 ++++++- goth/runner/web_server.py | 2 ++ 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/goth/runner/__init__.py b/goth/runner/__init__.py index 6cf80b003..308891491 100644 --- a/goth/runner/__init__.py +++ b/goth/runner/__init__.py @@ -150,9 +150,6 @@ def _create_probes(self, scenario_dir: Path) -> None: docker_client = docker.from_env() for config in self._topology: - logger.debug( - "Creating probe. config=%s, probe_type=%s", config, config.probe_type - ) log_config = config.log_config or LogConfig(config.name) log_config.base_dir = scenario_dir diff --git a/goth/runner/container/compose.py b/goth/runner/container/compose.py index c8d7779c9..0437ac7f5 100644 --- a/goth/runner/container/compose.py +++ b/goth/runner/container/compose.py @@ -199,7 +199,9 @@ async def run_compose_network( """Implement AsyncContextManager for starting/stopping docker compose network.""" try: + logger.debug("Starting compose network. log_dir=%s, force_build=%s", log_dir, force_build) await compose_manager.start_network(log_dir, force_build) yield finally: + logger.debug("Stopping compose network") await compose_manager.stop_network() diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index d980ac363..3de7b1453 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -325,12 +325,16 @@ def create_probe( probe: Optional[Probe] = None try: + logger.debug( + "Creating probe. config=%s, probe_type=%s", config, config.probe_type + ) probe = config.probe_type(runner, docker_client, config, log_config) for name, value in config.probe_properties.items(): probe.__setattr__(name, value) yield probe finally: if probe: + logger.debug("Removing probe. name=%s", probe.name) probe.remove() @@ -339,6 +343,7 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]: """Implement AsyncContextManager for starting and stopping a probe.""" try: + logger.debug("Starting probe. name=%s", probe.name) await probe.start() assert probe.ip_address yield probe.ip_address diff --git a/goth/runner/proxy.py b/goth/runner/proxy.py index a857d8350..a38605bfc 100644 --- a/goth/runner/proxy.py +++ b/goth/runner/proxy.py @@ -21,6 +21,9 @@ mitmproxy.utils.debug.register_info_dumpers = lambda *args: None +logger = logging.getLogger(__name__) + + class Proxy: """Proxy using mitmproxy to generate events out of http calls.""" @@ -104,10 +107,12 @@ def start(inner_self): @contextlib.asynccontextmanager async def run_proxy(proxy: Proxy) -> AsyncIterator[Proxy]: - """Implement AsyncContextManager protocol for stating and stopping a Proxy.""" + """Implement AsyncContextManager protocol for starting and stopping a Proxy.""" try: + logger.debug("Starting mitmproxy") proxy.start() yield finally: + logger.debug("Stopping mitmproxy") await proxy.stop() diff --git a/goth/runner/web_server.py b/goth/runner/web_server.py index 283549d84..87b5ae792 100644 --- a/goth/runner/web_server.py +++ b/goth/runner/web_server.py @@ -90,7 +90,9 @@ async def run_web_server(server: WebServer, server_address: Optional[str]) -> No """Implement AsyncContextManager protocol for starting/stopping a web server.""" try: + logger.debug("Starting web server. address=%s", server_address) await server.start(server_address) yield finally: + logger.debug("Stopping web server. address=%s", server_address) await server.stop() From 55c178aaa6926d3c70268cfc226e6e41d051018f Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Fri, 26 Mar 2021 15:42:59 +0100 Subject: [PATCH 07/13] Black formatting --- goth/runner/container/compose.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/goth/runner/container/compose.py b/goth/runner/container/compose.py index 0437ac7f5..e14d4c8e6 100644 --- a/goth/runner/container/compose.py +++ b/goth/runner/container/compose.py @@ -199,7 +199,9 @@ async def run_compose_network( """Implement AsyncContextManager for starting/stopping docker compose network.""" try: - logger.debug("Starting compose network. log_dir=%s, force_build=%s", log_dir, force_build) + logger.debug( + "Starting compose network. log_dir=%s, force_build=%s", log_dir, force_build + ) await compose_manager.start_network(log_dir, force_build) yield finally: From 44cd5fa3b4fc1d25a32d75365c15299d37efd63c Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Fri, 26 Mar 2021 17:38:59 +0100 Subject: [PATCH 08/13] Initalise log monitor in AgentComponent __init__ --- goth/runner/probe/__init__.py | 6 +----- goth/runner/probe/agent.py | 3 ++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 3de7b1453..08456424d 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -352,11 +352,7 @@ async def run_probe(probe: Probe) -> AsyncIterator[str]: class RequestorProbe(ActivityApiMixin, MarketApiMixin, PaymentApiMixin, Probe): - """A requestor probe that can make calls to Yagna REST APIs. - - This class is used in Level 1 scenarios and as a type of `self` - argument for `Market/Payment/ActivityOperationsMixin` methods. - """ + """A probe subclass with activity API steps and requestor payment init.""" async def _start_container(self) -> None: await super()._start_container() diff --git a/goth/runner/probe/agent.py b/goth/runner/probe/agent.py index 0515acb27..bacf278b4 100644 --- a/goth/runner/probe/agent.py +++ b/goth/runner/probe/agent.py @@ -36,10 +36,11 @@ class AgentComponent(ProbeComponent, abc.ABC): def __init__(self, probe: "Probe", name: str): super().__init__(probe) self.name = name + self._init_log_monitor() + @abc.abstractmethod async def start(self, *args, **kwargs): """Start the agent binary and initialize the internal log monitor.""" - self._init_log_monitor() async def stop(self, *args, **kwargs): """Stop the agent binary and its log monitor.""" From 3af6bb36a270463263259f523a5764357b94e479 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Fri, 26 Mar 2021 17:47:39 +0100 Subject: [PATCH 09/13] Update agent unit tests --- test/goth/runner/probe/test_agent.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/test/goth/runner/probe/test_agent.py b/test/goth/runner/probe/test_agent.py index 1e56de158..0ab29f474 100644 --- a/test/goth/runner/probe/test_agent.py +++ b/test/goth/runner/probe/test_agent.py @@ -6,19 +6,27 @@ from goth.runner.probe.agent import AgentComponent -def test_agent_duplicate_names(): - """Test if two agent objects with the same name will be treated as equal.""" +class MockAgentComponent(AgentComponent): + """`AgentComponent` implementation used for testing.""" - class TestAgentComponent(AgentComponent): + def start(self, *args, **kwargs): + """Abstract method implementation.""" pass + def _init_log_monitor(self): + pass + + +def test_agent_duplicate_names(): + """Test if two agent objects with the same name will be treated as equal.""" + agents = set() agent_name = "test_agent" probe = MagicMock(spec=Probe) - first_agent = TestAgentComponent(probe, agent_name) + first_agent = MockAgentComponent(probe, agent_name) agents.add(first_agent) - agents.add(TestAgentComponent(probe, agent_name)) + agents.add(MockAgentComponent(probe, agent_name)) assert len(agents) == 1 assert agents.pop() == first_agent @@ -27,13 +35,10 @@ class TestAgentComponent(AgentComponent): def test_agent_different_names(): """Test if two agent objects with different names will be treated as different.""" - class TestAgentComponent(AgentComponent): - pass - agents = set() probe = MagicMock(spec=Probe) - agents.add(TestAgentComponent(probe, "agent_1")) - agents.add(TestAgentComponent(probe, "agent_2")) + agents.add(MockAgentComponent(probe, "agent_1")) + agents.add(MockAgentComponent(probe, "agent_2")) assert len(agents) == 2 From e450ea99e1575c2baa7eddf7346c22f2b0a09f51 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Fri, 26 Mar 2021 18:04:18 +0100 Subject: [PATCH 10/13] Fix getting probe agents in Runner Co-authored-by: azawlocki --- goth/runner/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/goth/runner/__init__.py b/goth/runner/__init__.py index 308891491..620b05427 100644 --- a/goth/runner/__init__.py +++ b/goth/runner/__init__.py @@ -126,9 +126,7 @@ def get_probes( def check_assertion_errors(self) -> None: """If any monitor reports an assertion error, raise the first error.""" - probe_agents = [] - for probe in self.probes: - probe_agents.extend(chain(probe.agents)) + probe_agents = chain(*(probe.agents for probe in self.probes)) monitors = chain.from_iterable( ( From 5a4a7b8d552fe8a8dd88dfb4be2d3f7332bce1c7 Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Tue, 30 Mar 2021 14:53:56 +0200 Subject: [PATCH 11/13] Add start_agents method to Probe --- goth/runner/__init__.py | 4 +--- goth/runner/probe/__init__.py | 5 +++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/goth/runner/__init__.py b/goth/runner/__init__.py index 620b05427..74d5ce416 100644 --- a/goth/runner/__init__.py +++ b/goth/runner/__init__.py @@ -199,9 +199,7 @@ async def _start_nodes(self): await self._exit_stack.enter_async_context(run_proxy(self.proxy)) # Collect all agent enabled probes and start them in parallel - awaitables = [] - for probe in self.probes: - awaitables.extend([a.start() for a in probe.agents]) + awaitables = [probe.start_agents() for probe in self.probes] await asyncio.gather(*awaitables) @property diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 08456424d..4dcb54be4 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -160,6 +160,11 @@ async def start(self) -> None: await self._start_container() self.api = RestApiComponent(self) + async def start_agents(self): + """Start all of the probe's agents.""" + for agent in self.agents: + await agent.start() + async def stop(self): """ Stop the probe, removing the Docker container of the daemon being tested. From b365b294698f74a8ea5940d6dd68166afd12847f Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Tue, 30 Mar 2021 15:10:39 +0200 Subject: [PATCH 12/13] Review comments in agent.py --- goth/runner/probe/agent.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/goth/runner/probe/agent.py b/goth/runner/probe/agent.py index bacf278b4..ea82aaea7 100644 --- a/goth/runner/probe/agent.py +++ b/goth/runner/probe/agent.py @@ -55,9 +55,8 @@ def _init_log_monitor(self): log_config.base_dir = probe.container.log_config.base_dir self.log_monitor = LogEventMonitor(self.name, log_config) - self._last_checked_line = -1 - async def wait_for_log(self, pattern: str, timeout: float = 1000) -> LogEvent: + async def wait_for_log(self, pattern: str, timeout: Optional[float] = None) -> LogEvent: """Search agent logs for a log line with the message matching `pattern`.""" entry = await self.log_monitor.wait_for_entry(pattern, timeout) return entry From a9d0d890540952e8b6358e1b587be51963ed465c Mon Sep 17 00:00:00 2001 From: Kuba Mazurek Date: Tue, 30 Mar 2021 20:51:04 +0200 Subject: [PATCH 13/13] Create add_agent method, use OrderedDict for storing agents --- goth/runner/probe/__init__.py | 29 ++++++++++++++---- goth/runner/probe/agent.py | 12 ++------ goth/runner/probe/mixin.py | 5 ++-- test/goth/runner/probe/test_agent.py | 44 ---------------------------- 4 files changed, 28 insertions(+), 62 deletions(-) delete mode 100644 test/goth/runner/probe/test_agent.py diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index 4dcb54be4..5d1c658ef 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -2,6 +2,7 @@ import abc import asyncio +from collections import OrderedDict import contextlib import copy import logging @@ -10,8 +11,8 @@ AsyncIterator, Dict, Iterator, + List, Optional, - Set, Tuple, TYPE_CHECKING, ) @@ -69,9 +70,6 @@ class Probe(abc.ABC): in subclasses (see `ProviderProbe` and `RequestorProbe`). """ - agents: Set[AgentComponent] - """Set of agent components that will be started as part of this probe.""" - api: RestApiComponent """Component with clients for all three yagna REST APIs.""" @@ -87,6 +85,12 @@ class Probe(abc.ABC): ip_address: Optional[str] = None """An IP address of the daemon's container in the Docker network.""" + _agents: "OrderedDict[str, AgentComponent]" + """Collection of agent components that will be started as part of this probe. + + Keys are agent names, values are subclasses of `AgentComponent`. + """ + _docker_client: DockerClient """A docker client used to create the deamon's container.""" @@ -107,8 +111,8 @@ def __init__( config: YagnaContainerConfig, log_config: LogConfig, ): - self.agents = set() self.runner = runner + self._agents = OrderedDict() self._docker_client = client self._logger = ProbeLoggingAdapter( logger, {ProbeLoggingAdapter.EXTRA_PROBE_NAME: config.name} @@ -121,6 +125,11 @@ def __init__( def __str__(self): return self.name + @property + def agents(self) -> List[AgentComponent]: + """List of agent components that will be started as part of this probe.""" + return list(self._agents.values()) + @property def address(self) -> Optional[str]: """Return address from id marked as default.""" @@ -154,6 +163,14 @@ def _setup_gftp_proxy(self, config: YagnaContainerConfig) -> YagnaContainerConfi return new_config + def add_agent(self, agent: AgentComponent) -> None: + """Add an agent to be started for this probe.""" + if self._agents.get(agent.name): + raise KeyAlreadyExistsError( + f"Probe already has agent component with name: `{agent.name}`" + ) + self._agents[agent.name] = agent + async def start(self) -> None: """Start the probe.""" @@ -384,4 +401,4 @@ def __init__( ): super().__init__(runner, client, config, log_config) self.provider_agent = ProviderAgentComponent(self, subnet, agent_preset) - self.agents.add(self.provider_agent) + self.add_agent(self.provider_agent) diff --git a/goth/runner/probe/agent.py b/goth/runner/probe/agent.py index ea82aaea7..49cff3af9 100644 --- a/goth/runner/probe/agent.py +++ b/goth/runner/probe/agent.py @@ -56,19 +56,13 @@ def _init_log_monitor(self): self.log_monitor = LogEventMonitor(self.name, log_config) - async def wait_for_log(self, pattern: str, timeout: Optional[float] = None) -> LogEvent: + async def wait_for_log( + self, pattern: str, timeout: Optional[float] = None + ) -> LogEvent: """Search agent logs for a log line with the message matching `pattern`.""" entry = await self.log_monitor.wait_for_entry(pattern, timeout) return entry - def __hash__(self): - return hash(self.name) - - def __eq__(self, other): - if type(self) is type(other): - return self.__hash__() == other.__hash__() - return False - class ProviderAgentComponent(AgentComponent): """Probe component which runs `ya-provider` in the probe's container.""" diff --git a/goth/runner/probe/mixin.py b/goth/runner/probe/mixin.py index ae007b3e0..dd2288554 100644 --- a/goth/runner/probe/mixin.py +++ b/goth/runner/probe/mixin.py @@ -9,7 +9,6 @@ List, Optional, Protocol, - Set, Sequence, Tuple, TYPE_CHECKING, @@ -36,8 +35,8 @@ class ProbeProtocol(Protocol): This is mainly to fix mypy errors when using `Probe` directly as `self` type. """ - agents: "Set[AgentComponent]" - """Set of agent components that will be started as part of a probe.""" + agents: "List[AgentComponent]" + """List of agent components that will be started as part of a probe.""" api: "RestApiComponent" """REST API probe component.""" diff --git a/test/goth/runner/probe/test_agent.py b/test/goth/runner/probe/test_agent.py deleted file mode 100644 index 0ab29f474..000000000 --- a/test/goth/runner/probe/test_agent.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Unit tests for probe agents.""" - -from unittest.mock import MagicMock - -from goth.runner.probe import Probe -from goth.runner.probe.agent import AgentComponent - - -class MockAgentComponent(AgentComponent): - """`AgentComponent` implementation used for testing.""" - - def start(self, *args, **kwargs): - """Abstract method implementation.""" - pass - - def _init_log_monitor(self): - pass - - -def test_agent_duplicate_names(): - """Test if two agent objects with the same name will be treated as equal.""" - - agents = set() - agent_name = "test_agent" - probe = MagicMock(spec=Probe) - - first_agent = MockAgentComponent(probe, agent_name) - agents.add(first_agent) - agents.add(MockAgentComponent(probe, agent_name)) - - assert len(agents) == 1 - assert agents.pop() == first_agent - - -def test_agent_different_names(): - """Test if two agent objects with different names will be treated as different.""" - - agents = set() - probe = MagicMock(spec=Probe) - - agents.add(MockAgentComponent(probe, "agent_1")) - agents.add(MockAgentComponent(probe, "agent_2")) - - assert len(agents) == 2