From 054f0681ff3c0d65f425f4b5a85bba9be141654d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 11:48:56 +0100 Subject: [PATCH 1/3] recycle IP addresses for terminated services --- yapapi/network.py | 11 +++++++++++ yapapi/rest/net.py | 3 +++ yapapi/services/service.py | 3 +++ yapapi/services/service_runner.py | 3 +++ 4 files changed, 20 insertions(+) diff --git a/yapapi/network.py b/yapapi/network.py index c362b1d07..b9e79b516 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -269,6 +269,17 @@ async def add_node(self, node_id: str, ip: Optional[str] = None) -> Node: return node + async def remove_node(self, node_id: str) -> None: + """Remove the node from the network. + + :param node_id: Node ID within the Golem network of the VPN node to be removed. + """ + + await self._net_api.remove_node(self.network_id, node_id) + + async with self._nodes_lock: + del self._nodes[node_id] + async def _refresh_node(self, node: Node): logger.debug("refreshing node %s", node) try: diff --git a/yapapi/rest/net.py b/yapapi/rest/net.py index e6e323b86..bc5ac3ea1 100644 --- a/yapapi/rest/net.py +++ b/yapapi/rest/net.py @@ -35,3 +35,6 @@ async def add_address(self, network_id: str, ip: str): async def add_node(self, network_id: str, node_id: str, ip: str): await self._api.add_node(network_id, yan.Node(node_id, ip)) + + async def remove_node(self, network_id: str, node_id: str): + await self._api.remove_node(network_id, node_id) diff --git a/yapapi/services/service.py b/yapapi/services/service.py index e5a7a2fa4..13a792cd5 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -132,6 +132,9 @@ def _set_ctx(self, ctx: WorkContext) -> None: def _set_network_node(self, node: Node) -> None: self._network_node = node + def _clear_network_node(self) -> None: + self._network_node = None + def __repr__(self): class_name = type(self).__name__ state = self.state.value diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 930224e24..6a01568e7 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -421,6 +421,9 @@ async def _worker(work_context: WorkContext) -> None: work_context.emit(events.WorkerFinished, exc_info=sys.exc_info()) raise finally: + if network and service.network_node: + await network.remove_node(work_context.provider_id) + service._clear_network_node() await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id) await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) From 0c29684b896cb4f206a3d1f8314fc990793b37bd Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 13:49:46 +0100 Subject: [PATCH 2/3] add an integration test for the IP recycling issue --- pyproject.toml | 2 +- .../test_recycle_ip/ssh_recycle_ip.py | 100 +++++++++++++++ .../test_recycle_ip/test_recycle_ip.py | 117 ++++++++++++++++++ 3 files changed, 218 insertions(+), 1 deletion(-) create mode 100755 tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py create mode 100644 tests/goth_tests/test_recycle_ip/test_recycle_ip.py diff --git a/pyproject.toml b/pyproject.toml index eea7ba43f..94e6cc721 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ ya-market = "0.1.0" [tool.poe.tasks] test = "pytest --cov=yapapi --ignore tests/goth_tests" goth-assets = "python -m goth create-assets tests/goth_tests/assets" -goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.release-tag=pre-rel-v0.12.0-rc2 --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" typecheck = "mypy --check-untyped-defs --no-implicit-optional ." _codestyle_isort = "isort --check-only --diff ." _codestyle_black = "black --check --diff ." diff --git a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py new file mode 100755 index 000000000..3474762b3 --- /dev/null +++ b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import asyncio +from datetime import timedelta +import random +import string + +from yapapi import Golem +from yapapi.contrib.service.socket_proxy import SocketProxy, SocketProxyService +from yapapi.payload import vm + + +first_time = True + + +class SshService(SocketProxyService): + remote_port = 22 + + def __init__(self, proxy: SocketProxy): + super().__init__() + self.proxy = proxy + + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash="1e06505997e8bd1b9e1a00bd10d255fc6a390905e4d6840a22a79902", # ssh example + capabilities=[vm.VM_CAPS_VPN], + ) + + async def start(self): + global first_time + + async for script in super().start(): + yield script + + password = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)) + + script = self._ctx.new_script(timeout=timedelta(seconds=10)) + + if first_time: + first_time = False + raise Exception("intentional failure on the first run") + + script.run("/bin/bash", "-c", "syslogd") + script.run("/bin/bash", "-c", "ssh-keygen -A") + script.run("/bin/bash", "-c", f'echo -e "{password}\n{password}" | passwd') + script.run("/bin/bash", "-c", "/usr/sbin/sshd") + yield script + + server = await self.proxy.run_server(self, self.remote_port) + + print( + f"connect with:\n" + f"ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no " + f"-p {server.local_port} root@{server.local_address}" + ) + print(f"password: {password}") + + +async def main(): + async with Golem( + budget=1.0, + subnet_tag="goth", + ) as golem: + network = await golem.create_network("192.168.0.1/24") + proxy = SocketProxy(ports=[2222]) + + async with network: + cluster = await golem.run_service( + SshService, + network=network, + instance_params=[{"proxy": proxy}], + network_addresses=["192.168.0.2"], + ) + instances = cluster.instances + + while True: + print(instances) + print(".....", network._nodes) + try: + await asyncio.sleep(5) + except (KeyboardInterrupt, asyncio.CancelledError): + break + + await proxy.stop() + cluster.stop() + + cnt = 0 + while cnt < 3 and any(s.is_available for s in instances): + print(instances) + await asyncio.sleep(5) + cnt += 1 + + +if __name__ == "__main__": + try: + loop = asyncio.get_event_loop() + task = loop.create_task(main()) + loop.run_until_complete(task) + except KeyboardInterrupt: + pass diff --git a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py new file mode 100644 index 000000000..e88b2a8f9 --- /dev/null +++ b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py @@ -0,0 +1,117 @@ +"""An integration test scenario that runs the SSH example requestor app.""" +import asyncio +import logging +import os +from pathlib import Path +import pexpect +import pytest +import re +import signal +import sys +import time +from typing import List + +from goth.configuration import Override, load_yaml +from goth.runner import Runner +from goth.runner.log import configure_logging +from goth.runner.probe import RequestorProbe + +from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors + +logger = logging.getLogger("goth.test.recycle_ips") + +SUBNET_TAG = "goth" + + +@pytest.mark.asyncio +async def test_recycle_ip( + log_dir: Path, + project_dir: Path, + goth_config_path: Path, + config_overrides: List[Override], + ssh_verify_connection: bool, +) -> None: + + if ssh_verify_connection: + ssh_check = pexpect.spawn("/usr/bin/which ssh") + exit_code = ssh_check.wait() + if exit_code != 0: + raise ProcessLookupError( + "ssh binary not found, please install it or check your PATH. " + "You may also skip the connection check by omitting `--ssh-verify-connection`." + ) + + configure_logging(log_dir) + + # This is the default configuration with 2 wasm/VM providers + goth_config = load_yaml(goth_config_path, config_overrides) + + # disable the mitm proxy used to capture the requestor agent -> daemon API calls + # because it doesn't support websockets which are neeeded to enable VPN connections + requestor = [c for c in goth_config.containers if c.name == "requestor"][0] + requestor.use_proxy = False + + requestor_path = str(Path(__file__).parent / "ssh_recycle_ip.py") + + runner = Runner( + base_log_dir=log_dir, + compose_config=goth_config.compose_config, + ) + + async with runner(goth_config.containers): + + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + + async with requestor.run_command_on_host( + f"{requestor_path} --subnet-tag {SUBNET_TAG}", + env=os.environ, + ) as (_cmd_task, cmd_monitor, process_monitor): + cmd_monitor.add_assertion(assert_no_errors) + cmd_monitor.add_assertion(assert_all_invoices_accepted) + + ssh_connections = [] + + # A longer timeout to account for downloading a VM image + ssh_string = await cmd_monitor.wait_for_pattern("ssh -o", timeout=120) + matches = re.match("ssh -o .* -p (\\d+) (root@.*)", ssh_string) + port = matches.group(1) + address = matches.group(2) + password = re.sub("password: ", "", await cmd_monitor.wait_for_pattern("password:")) + + ssh_connections.append((port, address, password)) + + await cmd_monitor.wait_for_pattern(".*SshService running on provider", timeout=10) + logger.info("SSH service instances started") + + if not ssh_verify_connection: + logger.warning( + "Skipping SSH connection check. Use `--ssh-verify-connection` to perform it." + ) + else: + for port, address, password in ssh_connections: + args = [ + "ssh", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-p", + port, + address, + "uname -v", + ] + + logger.debug("running ssh with: %s", args) + + ssh = pexpect.spawn(" ".join(args)) + ssh.expect("[pP]assword:", timeout=5) + ssh.sendline(password) + ssh.expect("#1-Alpine SMP", timeout=5) + ssh.expect(pexpect.EOF, timeout=5) + logger.info("Connection to port %s confirmed.", port) + + logger.info("SSH connections confirmed.") + + proc: asyncio.subprocess.Process = await process_monitor.get_process() + proc.send_signal(signal.SIGINT) + logger.info("Sent SIGINT...") From a4e6b00450f2a885a84620eb71ae1d96826310f6 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 14:02:09 +0100 Subject: [PATCH 3/3] isort ;p goth test fix --- pyproject.toml | 2 +- tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py | 1 - tests/goth_tests/test_recycle_ip/test_recycle_ip.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94e6cc721..0a06ff8ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ ya-market = "0.1.0" [tool.poe.tasks] test = "pytest --cov=yapapi --ignore tests/goth_tests" goth-assets = "python -m goth create-assets tests/goth_tests/assets" -goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" typecheck = "mypy --check-untyped-defs --no-implicit-optional ." _codestyle_isort = "isort --check-only --diff ." _codestyle_black = "black --check --diff ." diff --git a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py index 3474762b3..3e43a5517 100755 --- a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py +++ b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py @@ -8,7 +8,6 @@ from yapapi.contrib.service.socket_proxy import SocketProxy, SocketProxyService from yapapi.payload import vm - first_time = True diff --git a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py index e88b2a8f9..115d7fe88 100644 --- a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py +++ b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py @@ -16,7 +16,7 @@ from goth.runner.log import configure_logging from goth.runner.probe import RequestorProbe -from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors +from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors # isort:skip logger = logging.getLogger("goth.test.recycle_ips")