Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOTH] Renegotiate proposal #551

Merged
merged 6 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions tests/goth_tests/test_renegotiate_proposal/requestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
import asyncio
from asyncio import TimeoutError
import datetime
import sys
from typing_extensions import Final
import ya_market
from yapapi import props as yp
from yapapi.log import enable_default_logger
from yapapi.props.builder import DemandBuilder
from yapapi.rest import Configuration, Market
from yapapi.rest.market import OfferProposal

from examples import utils

DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?"
PROPOSALS_LIMIT: Final[int] = 4


async def _respond(proposal: OfferProposal, dbuild) -> str:
dbuild.properties["golem.com.payment.chosen-platform"] = "NGNT"
timeout = proposal.props.get(DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP)
dbuild.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout
return await proposal.respond(dbuild.properties, dbuild.constraints)


async def renegotiate_offers(conf: Configuration, subnet_tag: str):
"""Rejects every proposal & then renegotiates it"""
async with conf.market() as client:
market_api = Market(client)
dbuild = DemandBuilder()
dbuild.add(yp.NodeInfo(name="some renegotiating node", subnet_tag=subnet_tag))
dbuild.add(
yp.Activity(
expiration=datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(minutes=30)
)
)

async with market_api.subscribe(dbuild.properties, dbuild.constraints) as subscription:
issuers = set()
proposals = 0
rejected_proposals = set() # Already rejected, don't reject again
async for event in subscription.events():
node_name = event.props.get("golem.node.id.name")
proposal_id = event._proposal.proposal.proposal_id
print(f"\n[{node_name}] {'*'*15} {proposal_id}")
prev_proposal_id = event._proposal.proposal.prev_proposal_id
print(f"[{node_name}] prev_proposal_id: {prev_proposal_id}")
if not event.is_draft:
if proposals > PROPOSALS_LIMIT:
print(f"[node_name] Skipping additional proposal")
break
await _respond(event, dbuild)
proposals += 1
issuers.add(event.issuer)
print(f"[{node_name}] Responded. proposals={proposals}, issuers={len(issuers)}")
continue

print(
f"[{node_name}] Offer: {proposal_id} from {event.issuer} is_draft: {event.is_draft}"
)
if prev_proposal_id not in rejected_proposals:
await event.reject()
print(f"[{node_name}] Rejected {len(rejected_proposals)}. id: {proposal_id}")
await asyncio.sleep(1)
print(f"[{node_name}] Renegotiating. id: {proposal_id}")
new_offer_id = await _respond(event, dbuild)
print(f"[{node_name}] new_offer_id: {new_offer_id}")
rejected_proposals.add(new_offer_id)
continue
print(".create_agreement()")
agreement = await event.create_agreement()
print(".confirm()")
confirm_result = await agreement.confirm()
print(f"[{node_name}] agreement.confirm(): {confirm_result}")
if confirm_result:
terminate_reason = {
"message": "Work cancelled",
"golem.requestor.code": "Cancelled",
}
terminate_result = await agreement.terminate(terminate_reason)
print(f"agreement.terminate(): {terminate_result}")
print("All done")


def main():
subnet = "goth"

enable_default_logger()
try:
asyncio.get_event_loop().run_until_complete(
asyncio.wait_for(
renegotiate_offers(
Configuration(),
subnet_tag=subnet,
),
timeout=140,
)
)
except TimeoutError:
print("Main timeout triggered :(")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
import os
from pathlib import Path
import pytest
from typing import List

from goth.configuration import load_yaml, Override
from goth.runner.log import configure_logging
from goth.runner import Runner
from goth.runner.probe import RequestorProbe


logger = logging.getLogger("goth.test.renegotiate_proposal")


@pytest.mark.asyncio
async def test_renegotiation(
log_dir: Path,
goth_config_path: Path,
config_overrides: List[Override],
) -> None:

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(goth_config_path, config_overrides)
test_script_path = str(Path(__file__).parent / "requestor.py")

configure_logging(log_dir)

runner = Runner(
base_log_dir=log_dir,
compose_config=goth_config.compose_config,
)

async with runner(goth_config.containers):

requestor = runner.get_probes(probe_type=RequestorProbe)[0]

async with requestor.run_command_on_host(test_script_path, env=os.environ) as (
_cmd_task,
cmd_monitor,
):

await cmd_monitor.wait_for_pattern(r"\[.+\] Renegotiating", timeout=20)
await cmd_monitor.wait_for_pattern(r"agreement.terminate\(\): True", timeout=20)
# assert not "Main timeout triggered :("
await cmd_monitor.wait_for_pattern(r"All done", timeout=20)