From cdb6e004853dd243860bcdeea70253213e09f27d Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Tue, 20 Jul 2021 14:19:30 +0200 Subject: [PATCH 1/4] [GOTH] Renegotiate proposal --- .../test_renegotiate_proposal/requestor.py | 115 ++++++++++++++++++ .../test_renegotiate_proposal.py | 46 +++++++ 2 files changed, 161 insertions(+) create mode 100755 tests/goth_tests/test_renegotiate_proposal/requestor.py create mode 100644 tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py new file mode 100755 index 000000000..629e5bef5 --- /dev/null +++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +import asyncio +from asyncio import TimeoutError +import datetime +import sys +from typing_extensions import Final +import ya_market +from yapapi import props as yp +from yapapi.log import enable_default_logger +from yapapi.props.builder import DemandBuilder +from yapapi.rest import Configuration, Market +from yapapi.rest.market import OfferProposal + +from examples import utils + +DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?" + + +async def _respond(proposal: OfferProposal, dbuild) -> str: + dbuild.properties["golem.com.payment.chosen-platform"] = "NGNT" + timeout = proposal.props.get(DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP) + dbuild.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout + return await proposal.respond(dbuild.properties, dbuild.constraints) + + +async def renegotiate_offers(conf: Configuration, subnet_tag: str): + """Rejects every proposal & then renegotiates it""" + async with conf.market() as client: + market_api = Market(client) + dbuild = DemandBuilder() + dbuild.add(yp.NodeInfo(name="some renegotiating node", subnet_tag=subnet_tag)) + dbuild.add( + yp.Activity( + expiration=datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(minutes=30) + ) + ) + + async with market_api.subscribe(dbuild.properties, dbuild.constraints) as subscription: + issuers = set() + proposals = 0 + rejected_proposals = set() # Already rejected, don't reject again + async for event in subscription.events(): + node_name = event.props.get("golem.node.id.name") + proposal_id = event._proposal.proposal.proposal_id + print(f"\n[{node_name}] {'*'*15} {proposal_id}") + prev_proposal_id = event._proposal.proposal.prev_proposal_id + print(f"[{node_name}] prev_proposal_id: {prev_proposal_id}") + if not event.is_draft: + if proposals > 4: + print(f"[node_name] Skipping additional proposal") + break + try: + await _respond(event, dbuild) + except ya_market.exceptions.ApiException as e: + print(f"[{node_name}] respond error: {e}") + continue + proposals += 1 + issuers.add(event.issuer) + print(f"[{node_name}] Responded {proposals} {len(issuers)}") + continue + + # print(f"props {json.dumps(event.props, indent=4)}") + print( + f"[{node_name}] Offer: {proposal_id} from {event.issuer} is_draft: {event.is_draft}" + ) + if prev_proposal_id not in rejected_proposals: + await event.reject() + print(f"[{node_name}] Rejected {len(rejected_proposals)}. id: {proposal_id}") + await asyncio.sleep(1) + print(f"[{node_name}] Renegotiating. id: {proposal_id}") + try: + new_offer_id = await _respond(event, dbuild) + print(f"[{node_name}] new_offer_id: {new_offer_id}") + rejected_proposals.add(new_offer_id) + except ya_market.exceptions.ApiException as e: + print(f"[{node_name}] respond error: {e}") + continue + continue + print(".create_agreement()") + agreement = await event.create_agreement() + print(".confirm()") + confirm_result = await agreement.confirm() + print(f"[{node_name}] agreement.confirm(): {confirm_result}") + if confirm_result: + terminate_reason = { + "message": "Work cancelled", + "golem.requestor.code": "Cancelled", + } + terminate_result = await agreement.terminate(terminate_reason) + print(f"agreement.terminate(): {terminate_result}") + print("All done") + + +def main(): + subnet = "goth" + sys.stderr.write(f"Using subnet: {utils.TEXT_COLOR_YELLOW}{subnet}{utils.TEXT_COLOR_DEFAULT}\n") + + enable_default_logger() + try: + asyncio.get_event_loop().run_until_complete( + asyncio.wait_for( + renegotiate_offers( + Configuration(), + subnet_tag=subnet, + ), + timeout=140, + ) + ) + except TimeoutError: + print("Main timeout triggered :(") + + +if __name__ == "__main__": + main() diff --git a/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py b/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py new file mode 100644 index 000000000..fe84ffa70 --- /dev/null +++ b/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py @@ -0,0 +1,46 @@ +import logging +import os +from pathlib import Path +import pytest +from typing import List + +from goth.configuration import load_yaml, Override +from goth.runner.log import configure_logging +from goth.runner import Runner +from goth.runner.probe import RequestorProbe + + +logger = logging.getLogger("goth.test.renegotiate_proposal") + + +@pytest.mark.asyncio +async def test_renegotiation( + log_dir: Path, + goth_config_path: Path, + config_overrides: List[Override], +) -> None: + + # This is the default configuration with 2 wasm/VM providers + goth_config = load_yaml(goth_config_path, config_overrides) + test_script_path = str(Path(__file__).parent / "requestor.py") + + configure_logging(log_dir) + + runner = Runner( + base_log_dir=log_dir, + compose_config=goth_config.compose_config, + ) + + async with runner(goth_config.containers): + + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + + async with requestor.run_command_on_host(test_script_path, env=os.environ) as ( + _cmd_task, + cmd_monitor, + ): + + await cmd_monitor.wait_for_pattern(r"\[.+\] Renegotiating", timeout=20) + await cmd_monitor.wait_for_pattern(r"agreement.terminate\(\): True", timeout=20) + # assert not "Main timeout triggered :(" + await cmd_monitor.wait_for_pattern(r"All done", timeout=20) From d3745876f0e5ab4c5665cd7eb0e378b650482066 Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Mon, 2 Aug 2021 16:36:37 +0200 Subject: [PATCH 2/4] [review] Refactor --- tests/goth_tests/test_renegotiate_proposal/requestor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py index 629e5bef5..fec865b85 100755 --- a/tests/goth_tests/test_renegotiate_proposal/requestor.py +++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py @@ -14,6 +14,7 @@ from examples import utils DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?" +PROPOSALS_LIMIT: Final[int] = 4 async def _respond(proposal: OfferProposal, dbuild) -> str: @@ -47,7 +48,7 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str): prev_proposal_id = event._proposal.proposal.prev_proposal_id print(f"[{node_name}] prev_proposal_id: {prev_proposal_id}") if not event.is_draft: - if proposals > 4: + if proposals > PROPOSALS_LIMIT: print(f"[node_name] Skipping additional proposal") break try: @@ -94,7 +95,6 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str): def main(): subnet = "goth" - sys.stderr.write(f"Using subnet: {utils.TEXT_COLOR_YELLOW}{subnet}{utils.TEXT_COLOR_DEFAULT}\n") enable_default_logger() try: From 4b073bbb10f3e7dee0c5a9dd5582e1b8f03b36cf Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Mon, 2 Aug 2021 16:37:34 +0200 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Kuba Mazurek --- tests/goth_tests/test_renegotiate_proposal/requestor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py index fec865b85..1aea9e61f 100755 --- a/tests/goth_tests/test_renegotiate_proposal/requestor.py +++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py @@ -58,10 +58,9 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str): continue proposals += 1 issuers.add(event.issuer) - print(f"[{node_name}] Responded {proposals} {len(issuers)}") + print(f"[{node_name}] Responded. proposals={proposals}, issuers={len(issuers)}") continue - # print(f"props {json.dumps(event.props, indent=4)}") print( f"[{node_name}] Offer: {proposal_id} from {event.issuer} is_draft: {event.is_draft}" ) From 66a43c9b6642ce535fbc184952012f0fd3f0b1f1 Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Mon, 2 Aug 2021 17:14:10 +0200 Subject: [PATCH 4/4] [review] Remove unnecessary try..except --- .../test_renegotiate_proposal/requestor.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py index 1aea9e61f..e187754f3 100755 --- a/tests/goth_tests/test_renegotiate_proposal/requestor.py +++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py @@ -51,11 +51,7 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str): if proposals > PROPOSALS_LIMIT: print(f"[node_name] Skipping additional proposal") break - try: - await _respond(event, dbuild) - except ya_market.exceptions.ApiException as e: - print(f"[{node_name}] respond error: {e}") - continue + await _respond(event, dbuild) proposals += 1 issuers.add(event.issuer) print(f"[{node_name}] Responded. proposals={proposals}, issuers={len(issuers)}") @@ -69,13 +65,9 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str): print(f"[{node_name}] Rejected {len(rejected_proposals)}. id: {proposal_id}") await asyncio.sleep(1) print(f"[{node_name}] Renegotiating. id: {proposal_id}") - try: - new_offer_id = await _respond(event, dbuild) - print(f"[{node_name}] new_offer_id: {new_offer_id}") - rejected_proposals.add(new_offer_id) - except ya_market.exceptions.ApiException as e: - print(f"[{node_name}] respond error: {e}") - continue + new_offer_id = await _respond(event, dbuild) + print(f"[{node_name}] new_offer_id: {new_offer_id}") + rejected_proposals.add(new_offer_id) continue print(".create_agreement()") agreement = await event.create_agreement()