Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for multi-activity agreement #350

Merged
merged 4 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ async def worker(ctx: WorkContext, tasks):
output_file = f"output_{frame}.png"
ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file)
try:
# Set timeout for executing the script on the provider. Two minutes is plenty
# of time for computing a single frame, for other tasks it may be not enough.
# If the timeout is exceeded, this worker instance will be shut down and all
# remaining tasks, including the current one, will be computed by other providers.
yield ctx.commit(timeout=timedelta(seconds=120))
# Set timeout for executing the script on the provider. Usually, 30 seconds
# should be more than enough for computing a single frame, however a provider
# may require more time for the first task if it needs to download a VM image
# first. Once downloaded, the VM image will be cached and other tasks that use
# that image will be computed faster.
yield ctx.commit(timeout=timedelta(minutes=10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requestor will have to wait 10 minutes if any provider is broken (there are currently providers that are signing agreements and then doing nothing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. But the problem with the timeout being too low is recurring and sometimes affects all providers (for example it happened after 0.6.3 yagna release, when many providers needed to refresh their VM images and were unable to do so for a long time since the activities were interrupted by our blender.py requestor due to 2 minute timeouts).

# TODO: Check if job results are valid
# and reject by: task.reject_task(reason = 'invalid file')
task.accept_result(result=output_file)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ ya-market = "0.1.0"
[tool.poe.tasks]
test = "pytest --cov=yapapi --ignore tests/goth"
goth-assets = "python -m goth create-assets tests/goth/assets"
goth-tests = "pytest -svx tests/goth"
goth-tests = "pytest -svx tests/goth --config-override docker-compose.build-environment.release-tag=0.6."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this parameter used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to enforce using yagna release 0.6.x for the tests.

typecheck = "mypy ."
codestyle = "black --check --diff ."
_liccheck_export = "poetry export -E cli -f requirements.txt -o .requirements.txt"
Expand Down
42 changes: 42 additions & 0 deletions tests/goth/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
from datetime import datetime, timezone
from pathlib import Path
from typing import cast, List, Tuple

import pytest

from goth.configuration import Override
from yapapi.package import Package, vm


def pytest_addoption(parser):
"""Add optional parameters to pytest CLI invocations."""

parser.addoption(
"--config-override",
action="append",
help="Set an override for a value specified in goth-config.yml file. \
This argument may be used multiple times. \
Values must follow the convention: {yaml_path}={value}, e.g.: \
`docker-compose.build-environment.release-tag=0.6.`",
)


@pytest.fixture(scope="function")
def config_overrides(request) -> List[Override]:
"""Fixture parsing --config-override params passed to the test invocation.

This fixture has "function" scope, which means that each test function will
receive its own copy of the list of config overrides and may modify it at will,
without affecting other test functions run in the same session.
"""

overrides: List[str] = request.config.option.config_override or []
return cast(List[Override], [tuple(o.split("=", 1)) for o in overrides])


@pytest.fixture(scope="session")
def project_dir() -> Path:
Expand All @@ -17,3 +47,15 @@ def log_dir() -> Path:
log_dir = base_dir / f"goth_{date_str}"
log_dir.mkdir(parents=True)
return log_dir


@pytest.fixture()
def blender_vm_package():
async def coro():
return await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

return coro()
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ async def assert_all_tasks_computed(stream):
async def test_agreement_termination(
project_dir: Path,
log_dir: Path,
config_overrides,
) -> None:

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(project_dir / "tests" / "goth" / "assets" / "goth-config.yml")
goth_config = load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)
test_script_path = str(Path(__file__).parent / "requestor.py")

configure_logging(log_dir)
Expand Down
49 changes: 49 additions & 0 deletions tests/goth/test_multiactivity_agreement/requestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""A requestor script for testing if multiple workers are run for an agreement."""
import asyncio
from datetime import timedelta
import logging

from yapapi import Executor, Task
from yapapi.log import enable_default_logger, log_event_repr # noqa
from yapapi.package import vm


async def main():

vm_package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def worker(work_ctx, tasks):
"""Compute just one task and exit."""
async for task in tasks:
work_ctx.run("/bin/sleep", "1")
yield work_ctx.commit()
task.accept_result()
return

async with Executor(
budget=10.0,
package=vm_package,
max_workers=1,
subnet_tag="goth",
timeout=timedelta(minutes=6),
event_consumer=log_event_repr,
) as executor:

tasks = [Task(data=n) for n in range(3)]
async for task in executor.submit(worker, tasks):
print(f"Task computed: {task}")


if __name__ == "__main__":

enable_default_logger()
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
logging.getLogger("yapapi.events").addHandler(console_handler)

asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""A goth test scenario for multi-activity agreements."""
from functools import partial
import logging
import os
from pathlib import Path
import re

import pytest

import goth.configuration
from goth.runner import Runner
from goth.runner.probe import RequestorProbe
from goth.runner.log import configure_logging


logger = logging.getLogger("goth.test.multiactivity_agreement")


async def assert_agreement_created(events):
"""Assert that `AgreementCreated` event occurs."""

async for line in events:
m = re.match(r"AgreementCreated\(agr_id='([^']+)'", line)
if m:
return m.group(1)
raise AssertionError("Expected AgreementCreated event")


async def assert_multiple_workers_run(agr_id, events):
"""Assert that more than one worker is run with given `agr_id`.

Fails if a worker failure is detected or if a worker has run for another agreement.
"""
workers_finished = 0

async for line in events:
m = re.match(r"WorkerFinished\(agr_id='([^']+)'", line)
if m:
worker_agr_id = m.group(1)
assert worker_agr_id == agr_id, "Worker run for another agreement"
assert line.endswith(" exc_info=None)"), "Worker finished with error"
workers_finished += 1
elif re.match("ComputationFinished", line):
break

assert workers_finished > 1, (
f"Only {workers_finished} worker(s) run for agreement {agr_id}, " "expected more than one"
)


@pytest.mark.asyncio
async def test_multiactivity_agreement(project_dir: Path, log_dir: Path, config_overrides) -> None:

configure_logging(log_dir)

# Override the default test configuration to create only one provider node
nodes = [
{"name": "requestor", "type": "Requestor"},
{"name": "provider-1", "type": "VM-Wasm-Provider", "use-proxy": True},
]
config_overrides.append(("nodes", nodes))
goth_config = goth.configuration.load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)

runner = Runner(base_log_dir=log_dir, compose_config=goth_config.compose_config)

async with runner(goth_config.containers):

requestor = runner.get_probes(probe_type=RequestorProbe)[0]

async with requestor.run_command_on_host(
str(Path(__file__).parent / "requestor.py"), env=os.environ
) as (_cmd_task, cmd_monitor):

# Wait for agreement
assertion = cmd_monitor.add_assertion(assert_agreement_created)
agr_id = await assertion.wait_for_result(timeout=30)

# Wait for multiple workers run for the agreement
assertion = cmd_monitor.add_assertion(
partial(assert_multiple_workers_run, agr_id),
name=f"assert_multiple_workers_run({agr_id})",
)
await assertion.wait_for_result(timeout=60)
9 changes: 4 additions & 5 deletions tests/goth/test_run_blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ async def assert_all_invoices_accepted(output_lines: EventStream[str]):


@pytest.mark.asyncio
async def test_run_blender(
log_dir: Path,
project_dir: Path,
) -> None:
async def test_run_blender(log_dir: Path, project_dir: Path, config_overrides) -> None:

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(Path(__file__).parent / "assets" / "goth-config.yml")
goth_config = load_yaml(
Path(__file__).parent / "assets" / "goth-config.yml", overrides=config_overrides
)

blender_path = project_dir / "examples" / "blender" / "blender.py"

Expand Down