Skip to content

Commit

Permalink
Add assertions for blender.py output; update goth version in dependen…
Browse files Browse the repository at this point in the history
…cies
  • Loading branch information
azawlocki committed Mar 23, 2021
1 parent 1b68f71 commit 22a121b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ toml = "^0.10.1"
srvresolver = "^0.3.5"
colorama = "^0.4.4"

goth = {git = "https://github.com/golemfactory/goth.git", branch = "az/yapapi-ci", optional = true, python = "^3.8.0"}
goth = {git = "https://github.com/golemfactory/goth.git", branch = "az/command-output-monitor", optional = true, python = "^3.8.0"}
# goth = {path = "../goth", develop = true, optional = true, python = "^3.8.0"}

[tool.poetry.extras]
Expand Down
2 changes: 1 addition & 1 deletion tests/goth/assets/goth-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ docker-compose:
# deb-path: ...
# branch: ...
# commit-hash: ...
release-tag: "0.6.1-rc5"
# release-tag: ... (e.g. "latest", "0.6.2")

compose-log-patterns:
ethereum: ".*Wallets supplied."
Expand Down
95 changes: 87 additions & 8 deletions tests/goth/test_run_blender.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,77 @@
import asyncio
import logging
import os
from pathlib import Path
import re

import pytest

from goth.assertions import EventStream
from goth.configuration import load_yaml
from goth.runner.log import configure_logging
from goth.runner import Runner
from goth.runner.probe import RequestorProbe


logger = logging.getLogger("goth.test")
logger = logging.getLogger("goth.test.run_blender")

ALL_TASKS = {0, 10, 20, 30, 40, 50}


# Temporal assertions expressing properties of sequences of "events". In this case, each "event"
# is just a line of output from `blender.py`.


async def assert_no_errors(output_lines: EventStream[str]):
"""Assert that no output line contains the substring `ERROR`."""
async for line in output_lines:
if "ERROR" in line:
raise AssertionError("Command reported ERROR")


async def assert_all_tasks_processed(status: str, output_lines: EventStream[str]):
"""Assert that for every task in `ALL_TASKS` a line with `Task {status}` will appear."""
remaining_tasks = ALL_TASKS.copy()

async for line in output_lines:
m = re.search(rf".*Task {status} .* task data: ([0-9]+)", line)
if m:
task_data = int(m.group(1))
logger.debug("assert_all_tasks_processed: Task %s: %d", status, task_data)
remaining_tasks.discard(task_data)
if not remaining_tasks:
return

raise AssertionError(f"Tasks not {status}: {remaining_tasks}")


async def assert_all_tasks_sent(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task sent` will appear."""
await assert_all_tasks_processed("sent", output_lines)


async def assert_all_tasks_computed(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task computed` will appear."""
await assert_all_tasks_processed("computed", output_lines)


async def assert_all_invoices_accepted(output_lines: EventStream[str]):
"""Assert that an invoice is accepted for every provider that confirmed an agreement."""
unpaid_agreement_providers = set()

async for line in output_lines:
m = re.search("Agreement confirmed by provider '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding provider '%s'", prov_name)
unpaid_agreement_providers.add(prov_name)
m = re.search("Accepted invoice from '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding invoice for '%s'", prov_name)
unpaid_agreement_providers.remove(prov_name)

if unpaid_agreement_providers:
raise AssertionError(f"Unpaid agreements for: {','.join(unpaid_agreement_providers)}")


@pytest.mark.asyncio
Expand All @@ -36,13 +96,32 @@ async def test_run_blender(

requestor = runner.get_probes(probe_type=RequestorProbe)[0]

agent_task = requestor.run_command_on_host(
async with requestor.run_command_on_host(
f"{blender_path} --subnet-tag goth",
env=os.environ,
)
) as (_cmd_task, cmd_monitor):

# Add assertions to the command output monitor `cmd_monitor`:
cmd_monitor.add_assertion(assert_no_errors)
cmd_monitor.add_assertion(assert_all_invoices_accepted)
all_sent = cmd_monitor.add_assertion(assert_all_tasks_sent)
all_computed = cmd_monitor.add_assertion(assert_all_tasks_computed)

await cmd_monitor.wait_for_pattern(".*Received proposals from 2 ", timeout=10)
logger.info("Received proposals")

await cmd_monitor.wait_for_pattern(".*Agreement proposed ", timeout=10)
logger.info("Agreement proposed")

await cmd_monitor.wait_for_pattern(".*Agreement confirmed ", timeout=10)
logger.info("Agreement confirmed")

await all_sent.wait_for_result(timeout=60)
logger.info("All tasks sent")

await all_computed.wait_for_result(timeout=60)
logger.info("All tasks computed, waiting for Executor shutdown")

while not agent_task.done():
logger.info("Waiting for requestor script to complete...")
await asyncio.sleep(5)
await cmd_monitor.wait_for_pattern(".*Executor has shut down", timeout=120)

logger.info("Requestor script finished")
logger.info("Requestor script finished")

0 comments on commit 22a121b

Please sign in to comment.