From 75ba2b2694a5ae75485c5e3bf0853805f31f302d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Mon, 14 Nov 2022 08:24:23 +0100 Subject: [PATCH 01/16] use `public` as the default subnet --- README.md | 2 +- examples/hello-world/hello.py | 2 +- examples/hello-world/hello_service.py | 2 +- examples/utils/__init__.py | 2 +- tests/factories/props/__init__.py | 2 +- yapapi/engine.py | 2 +- yapapi/golem.py | 4 ++-- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 51f2ef83a..4725a0a82 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ It's possible to set various elements of `yagna` configuration through environme - `YAGNA_PAYMENT_NETWORK`, Ethereum network name for `yagna` to use, e.g. `rinkeby` - `YAGNA_PAYMENT_DRIVER`, payment driver name for `yagna` to use, e.g. `erc20` - `YAGNA_PAYMENT_URL`, URL to `yagna` payment API, e.g. `http://localhost:7500/payment-api/v1` -- `YAGNA_SUBNET`, name of the `yagna` sub network to be used, e.g. `devnet-beta` +- `YAGNA_SUBNET`, name of the `yagna` sub network to be used, e.g. `public` - `YAPAPI_USE_GFTP_CLOSE`, if set to a _truthy_ value (e.g. "1", "Y", "True", "on") then `yapapi` will ask `gftp` to close files when there's no need to publish them any longer. This may greatly reduce the number of files kept open while `yapapi` is running but requires `yagna` diff --git a/examples/hello-world/hello.py b/examples/hello-world/hello.py index ce02abc75..be8f642de 100755 --- a/examples/hello-world/hello.py +++ b/examples/hello-world/hello.py @@ -24,7 +24,7 @@ async def main(): tasks = [Task(data=None)] - async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: + async with Golem(budget=1.0, subnet_tag="public") as golem: async for completed in golem.execute_tasks(worker, tasks, payload=package): print(completed.result.stdout) diff --git a/examples/hello-world/hello_service.py b/examples/hello-world/hello_service.py index 33b99f721..4e571a049 100755 --- a/examples/hello-world/hello_service.py +++ b/examples/hello-world/hello_service.py @@ -48,7 +48,7 @@ async def run(self): async def main(): - async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: + async with Golem(budget=1.0, subnet_tag="public") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() diff --git a/examples/utils/__init__.py b/examples/utils/__init__.py index f323f3ed9..2a6563527 100644 --- a/examples/utils/__init__.py +++ b/examples/utils/__init__.py @@ -35,7 +35,7 @@ def build_parser(description: str) -> argparse.ArgumentParser: parser.add_argument( "--payment-network", "--network", help="Payment network name, for example `rinkeby`" ) - parser.add_argument("--subnet-tag", help="Subnet name, for example `devnet-beta`") + parser.add_argument("--subnet-tag", help="Subnet name, for example `public`") parser.add_argument( "--log-file", default=str(default_log_path), diff --git a/tests/factories/props/__init__.py b/tests/factories/props/__init__.py index f0e329997..feda92246 100644 --- a/tests/factories/props/__init__.py +++ b/tests/factories/props/__init__.py @@ -8,4 +8,4 @@ class Meta: model = NodeInfo name = factory.Faker("pystr") - subnet_tag = "devnet-beta" + subnet_tag = "public" diff --git a/yapapi/engine.py b/yapapi/engine.py index ff5c8c3f9..c286375dc 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -53,7 +53,7 @@ DEFAULT_DRIVER: str = os.getenv("YAGNA_PAYMENT_DRIVER", "erc20").lower() DEFAULT_NETWORK: str = os.getenv("YAGNA_PAYMENT_NETWORK", "rinkeby").lower() -DEFAULT_SUBNET: Optional[str] = os.getenv("YAGNA_SUBNET", "devnet-beta") +DEFAULT_SUBNET: Optional[str] = os.getenv("YAGNA_SUBNET", "public") MAX_CONCURRENTLY_PROCESSED_DEBIT_NOTES: Final[int] = 10 diff --git a/yapapi/golem.py b/yapapi/golem.py index fd67a2745..272996b7d 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -339,7 +339,7 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]): image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) - async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: + async with Golem(budget=1.0, subnet_tag="public") as golem: async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): print(completed.result.stdout) @@ -435,7 +435,7 @@ async def run(self): async def main(): - async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: + async with Golem(budget=1.0, subnet_tag="public") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() From 7b12f9bbac2701ac66e507b184956c3d5e0552f2 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Mon, 14 Nov 2022 14:03:17 +0100 Subject: [PATCH 02/16] add handling for remote response timeout in http proxy --- yapapi/contrib/service/http_proxy.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/yapapi/contrib/service/http_proxy.py b/yapapi/contrib/service/http_proxy.py index 8f64ab5f5..04cb309f2 100644 --- a/yapapi/contrib/service/http_proxy.py +++ b/yapapi/contrib/service/http_proxy.py @@ -78,7 +78,18 @@ def content_received(self) -> bool: async def get_response(self) -> web.Response: while not self.content_received: ws_response = await self.ws.receive(self.timeout) - self.receive_data(ws_response.data) + if ws_response.data: + self.receive_data(ws_response.data) + else: + logger.error( + "Empty response data while handling a request, " + "remote status: %s, remote headers: %s", + self.status, + self.headers, + ) + return web.Response( + status=504, text="Empty response data while handling a request." + ) assert self.status return web.Response(status=self.status, headers=self.headers, body=self.content) From bddd09c638813bdec531e395a195eedce7c27b9f Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Mon, 14 Nov 2022 14:28:01 +0100 Subject: [PATCH 03/16] fix `webapp-fileupload` * remove the `run` handler * fix the logfile name --- examples/webapp_fileupload/webapp_fileupload.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/examples/webapp_fileupload/webapp_fileupload.py b/examples/webapp_fileupload/webapp_fileupload.py index 682449577..35f73df20 100755 --- a/examples/webapp_fileupload/webapp_fileupload.py +++ b/examples/webapp_fileupload/webapp_fileupload.py @@ -60,14 +60,6 @@ async def start(self): ) yield script - async def run(self): - while True: - await asyncio.sleep(5) - script = self._ctx.new_script() - script.download_file("/logs/out", "webapp_out.txt") - script.download_file("/logs/err", "webapp_err.txt") - yield script - class DbService(Service): @staticmethod @@ -191,7 +183,7 @@ def raise_exception_if_still_starting(cluster): help="The local port to listen on", ) now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") - parser.set_defaults(log_file=f"webapp-yapapi-{now}.log") + parser.set_defaults(log_file=f"webapp-fileupload-yapapi-{now}.log") args = parser.parse_args() run_golem_example( From 1cadff73703cb726bbef79cec390f5bec9d062cb Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 15 Nov 2022 07:50:28 +0100 Subject: [PATCH 04/16] proper reporting of an ip address collission --- yapapi/services/service_runner.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index d2ddf7b90..930224e24 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -88,7 +88,7 @@ def add_instance( network: Optional[Network] = None, network_address: Optional[str] = None, ) -> None: - """Add service the the collection of services managed by this ServiceRunner. + """Add service to the collection of services managed by this ServiceRunner. The same object should never be managed by more than one ServiceRunner. """ @@ -401,11 +401,11 @@ async def _worker(work_context: WorkContext) -> None: service._set_ctx(work_context) self._change_state(instance) # pending -> starting - if network: - service._set_network_node( - await network.add_node(work_context.provider_id, network_address) - ) try: + if network: + service._set_network_node( + await network.add_node(work_context.provider_id, network_address) + ) if not self._stopped: instance_batches = self._run_instance(instance) try: From 054f0681ff3c0d65f425f4b5a85bba9be141654d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 11:48:56 +0100 Subject: [PATCH 05/16] recycle IP addresses for terminated services --- yapapi/network.py | 11 +++++++++++ yapapi/rest/net.py | 3 +++ yapapi/services/service.py | 3 +++ yapapi/services/service_runner.py | 3 +++ 4 files changed, 20 insertions(+) diff --git a/yapapi/network.py b/yapapi/network.py index c362b1d07..b9e79b516 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -269,6 +269,17 @@ async def add_node(self, node_id: str, ip: Optional[str] = None) -> Node: return node + async def remove_node(self, node_id: str) -> None: + """Remove the node from the network. + + :param node_id: Node ID within the Golem network of the VPN node to be removed. + """ + + await self._net_api.remove_node(self.network_id, node_id) + + async with self._nodes_lock: + del self._nodes[node_id] + async def _refresh_node(self, node: Node): logger.debug("refreshing node %s", node) try: diff --git a/yapapi/rest/net.py b/yapapi/rest/net.py index e6e323b86..bc5ac3ea1 100644 --- a/yapapi/rest/net.py +++ b/yapapi/rest/net.py @@ -35,3 +35,6 @@ async def add_address(self, network_id: str, ip: str): async def add_node(self, network_id: str, node_id: str, ip: str): await self._api.add_node(network_id, yan.Node(node_id, ip)) + + async def remove_node(self, network_id: str, node_id: str): + await self._api.remove_node(network_id, node_id) diff --git a/yapapi/services/service.py b/yapapi/services/service.py index e5a7a2fa4..13a792cd5 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -132,6 +132,9 @@ def _set_ctx(self, ctx: WorkContext) -> None: def _set_network_node(self, node: Node) -> None: self._network_node = node + def _clear_network_node(self) -> None: + self._network_node = None + def __repr__(self): class_name = type(self).__name__ state = self.state.value diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 930224e24..6a01568e7 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -421,6 +421,9 @@ async def _worker(work_context: WorkContext) -> None: work_context.emit(events.WorkerFinished, exc_info=sys.exc_info()) raise finally: + if network and service.network_node: + await network.remove_node(work_context.provider_id) + service._clear_network_node() await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id) await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) From 0c29684b896cb4f206a3d1f8314fc990793b37bd Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 13:49:46 +0100 Subject: [PATCH 06/16] add an integration test for the IP recycling issue --- pyproject.toml | 2 +- .../test_recycle_ip/ssh_recycle_ip.py | 100 +++++++++++++++ .../test_recycle_ip/test_recycle_ip.py | 117 ++++++++++++++++++ 3 files changed, 218 insertions(+), 1 deletion(-) create mode 100755 tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py create mode 100644 tests/goth_tests/test_recycle_ip/test_recycle_ip.py diff --git a/pyproject.toml b/pyproject.toml index eea7ba43f..94e6cc721 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ ya-market = "0.1.0" [tool.poe.tasks] test = "pytest --cov=yapapi --ignore tests/goth_tests" goth-assets = "python -m goth create-assets tests/goth_tests/assets" -goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.release-tag=pre-rel-v0.12.0-rc2 --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" typecheck = "mypy --check-untyped-defs --no-implicit-optional ." _codestyle_isort = "isort --check-only --diff ." _codestyle_black = "black --check --diff ." diff --git a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py new file mode 100755 index 000000000..3474762b3 --- /dev/null +++ b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import asyncio +from datetime import timedelta +import random +import string + +from yapapi import Golem +from yapapi.contrib.service.socket_proxy import SocketProxy, SocketProxyService +from yapapi.payload import vm + + +first_time = True + + +class SshService(SocketProxyService): + remote_port = 22 + + def __init__(self, proxy: SocketProxy): + super().__init__() + self.proxy = proxy + + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash="1e06505997e8bd1b9e1a00bd10d255fc6a390905e4d6840a22a79902", # ssh example + capabilities=[vm.VM_CAPS_VPN], + ) + + async def start(self): + global first_time + + async for script in super().start(): + yield script + + password = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)) + + script = self._ctx.new_script(timeout=timedelta(seconds=10)) + + if first_time: + first_time = False + raise Exception("intentional failure on the first run") + + script.run("/bin/bash", "-c", "syslogd") + script.run("/bin/bash", "-c", "ssh-keygen -A") + script.run("/bin/bash", "-c", f'echo -e "{password}\n{password}" | passwd') + script.run("/bin/bash", "-c", "/usr/sbin/sshd") + yield script + + server = await self.proxy.run_server(self, self.remote_port) + + print( + f"connect with:\n" + f"ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no " + f"-p {server.local_port} root@{server.local_address}" + ) + print(f"password: {password}") + + +async def main(): + async with Golem( + budget=1.0, + subnet_tag="goth", + ) as golem: + network = await golem.create_network("192.168.0.1/24") + proxy = SocketProxy(ports=[2222]) + + async with network: + cluster = await golem.run_service( + SshService, + network=network, + instance_params=[{"proxy": proxy}], + network_addresses=["192.168.0.2"], + ) + instances = cluster.instances + + while True: + print(instances) + print(".....", network._nodes) + try: + await asyncio.sleep(5) + except (KeyboardInterrupt, asyncio.CancelledError): + break + + await proxy.stop() + cluster.stop() + + cnt = 0 + while cnt < 3 and any(s.is_available for s in instances): + print(instances) + await asyncio.sleep(5) + cnt += 1 + + +if __name__ == "__main__": + try: + loop = asyncio.get_event_loop() + task = loop.create_task(main()) + loop.run_until_complete(task) + except KeyboardInterrupt: + pass diff --git a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py new file mode 100644 index 000000000..e88b2a8f9 --- /dev/null +++ b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py @@ -0,0 +1,117 @@ +"""An integration test scenario that runs the SSH example requestor app.""" +import asyncio +import logging +import os +from pathlib import Path +import pexpect +import pytest +import re +import signal +import sys +import time +from typing import List + +from goth.configuration import Override, load_yaml +from goth.runner import Runner +from goth.runner.log import configure_logging +from goth.runner.probe import RequestorProbe + +from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors + +logger = logging.getLogger("goth.test.recycle_ips") + +SUBNET_TAG = "goth" + + +@pytest.mark.asyncio +async def test_recycle_ip( + log_dir: Path, + project_dir: Path, + goth_config_path: Path, + config_overrides: List[Override], + ssh_verify_connection: bool, +) -> None: + + if ssh_verify_connection: + ssh_check = pexpect.spawn("/usr/bin/which ssh") + exit_code = ssh_check.wait() + if exit_code != 0: + raise ProcessLookupError( + "ssh binary not found, please install it or check your PATH. " + "You may also skip the connection check by omitting `--ssh-verify-connection`." + ) + + configure_logging(log_dir) + + # This is the default configuration with 2 wasm/VM providers + goth_config = load_yaml(goth_config_path, config_overrides) + + # disable the mitm proxy used to capture the requestor agent -> daemon API calls + # because it doesn't support websockets which are neeeded to enable VPN connections + requestor = [c for c in goth_config.containers if c.name == "requestor"][0] + requestor.use_proxy = False + + requestor_path = str(Path(__file__).parent / "ssh_recycle_ip.py") + + runner = Runner( + base_log_dir=log_dir, + compose_config=goth_config.compose_config, + ) + + async with runner(goth_config.containers): + + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + + async with requestor.run_command_on_host( + f"{requestor_path} --subnet-tag {SUBNET_TAG}", + env=os.environ, + ) as (_cmd_task, cmd_monitor, process_monitor): + cmd_monitor.add_assertion(assert_no_errors) + cmd_monitor.add_assertion(assert_all_invoices_accepted) + + ssh_connections = [] + + # A longer timeout to account for downloading a VM image + ssh_string = await cmd_monitor.wait_for_pattern("ssh -o", timeout=120) + matches = re.match("ssh -o .* -p (\\d+) (root@.*)", ssh_string) + port = matches.group(1) + address = matches.group(2) + password = re.sub("password: ", "", await cmd_monitor.wait_for_pattern("password:")) + + ssh_connections.append((port, address, password)) + + await cmd_monitor.wait_for_pattern(".*SshService running on provider", timeout=10) + logger.info("SSH service instances started") + + if not ssh_verify_connection: + logger.warning( + "Skipping SSH connection check. Use `--ssh-verify-connection` to perform it." + ) + else: + for port, address, password in ssh_connections: + args = [ + "ssh", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-p", + port, + address, + "uname -v", + ] + + logger.debug("running ssh with: %s", args) + + ssh = pexpect.spawn(" ".join(args)) + ssh.expect("[pP]assword:", timeout=5) + ssh.sendline(password) + ssh.expect("#1-Alpine SMP", timeout=5) + ssh.expect(pexpect.EOF, timeout=5) + logger.info("Connection to port %s confirmed.", port) + + logger.info("SSH connections confirmed.") + + proc: asyncio.subprocess.Process = await process_monitor.get_process() + proc.send_signal(signal.SIGINT) + logger.info("Sent SIGINT...") From a4e6b00450f2a885a84620eb71ae1d96826310f6 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 16 Nov 2022 14:02:09 +0100 Subject: [PATCH 07/16] isort ;p goth test fix --- pyproject.toml | 2 +- tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py | 1 - tests/goth_tests/test_recycle_ip/test_recycle_ip.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94e6cc721..0a06ff8ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,7 @@ ya-market = "0.1.0" [tool.poe.tasks] test = "pytest --cov=yapapi --ignore tests/goth_tests" goth-assets = "python -m goth create-assets tests/goth_tests/assets" -goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" typecheck = "mypy --check-untyped-defs --no-implicit-optional ." _codestyle_isort = "isort --check-only --diff ." _codestyle_black = "black --check --diff ." diff --git a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py index 3474762b3..3e43a5517 100755 --- a/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py +++ b/tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py @@ -8,7 +8,6 @@ from yapapi.contrib.service.socket_proxy import SocketProxy, SocketProxyService from yapapi.payload import vm - first_time = True diff --git a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py index e88b2a8f9..115d7fe88 100644 --- a/tests/goth_tests/test_recycle_ip/test_recycle_ip.py +++ b/tests/goth_tests/test_recycle_ip/test_recycle_ip.py @@ -16,7 +16,7 @@ from goth.runner.log import configure_logging from goth.runner.probe import RequestorProbe -from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors +from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors # isort:skip logger = logging.getLogger("goth.test.recycle_ips") From 982f61a61eec089e71d5002dc7498fa6ea0b392a Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Thu, 17 Nov 2022 09:51:59 +0100 Subject: [PATCH 08/16] bump version to 0.10.0-alpha.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0a06ff8ae..c6938230c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "yapapi" -version = "0.10.0-alpha.0" +version = "0.10.0-alpha.1" description = "High-level Python API for the New Golem" authors = ["Przemysław K. Rekucki ", "GolemFactory "] license = "LGPL-3.0-or-later" From 0347f55566c69687076a1cb37d1091e539b3eba0 Mon Sep 17 00:00:00 2001 From: pwalski <4924911+pwalski@users.noreply.github.com> Date: Thu, 24 Nov 2022 16:28:43 +0100 Subject: [PATCH 09/16] Golem signing certificate and a signature --- .../external-api-request/external_api_request.py | 6 +++--- examples/external-api-request/golem_sign.pem | 13 +++++++++++++ .../manifest.json.base64.sha256.sig | Bin 0 -> 70 bytes examples/external-api-request/sign.sh | 2 +- 4 files changed, 17 insertions(+), 4 deletions(-) create mode 100644 examples/external-api-request/golem_sign.pem create mode 100644 examples/external-api-request/manifest.json.base64.sha256.sig diff --git a/examples/external-api-request/external_api_request.py b/examples/external-api-request/external_api_request.py index da8ed7c38..fbc32c2db 100644 --- a/examples/external-api-request/external_api_request.py +++ b/examples/external-api-request/external_api_request.py @@ -22,13 +22,13 @@ async def get_payload(): manifest = open("manifest.json", "rb").read() manifest = base64.b64encode(manifest).decode("utf-8") - manifest_sig = open("manifest.json.base64.sign.sha256", "rb").read() + manifest_sig = open("manifest.json.base64.sha256.sig", "rb").read() manifest_sig = base64.b64encode(manifest_sig).decode("utf-8") manifest_sig_algorithm = "sha256" - # both DER and PEM formats are supported - manifest_cert = open("requestor.cert.der", "rb").read() + # DER, PEM and PEM chain formats are supported + manifest_cert = open("golem_sign.pem", "rb").read() manifest_cert = base64.b64encode(manifest_cert).decode("utf-8") return await vm.manifest( diff --git a/examples/external-api-request/golem_sign.pem b/examples/external-api-request/golem_sign.pem new file mode 100644 index 000000000..76be6f693 --- /dev/null +++ b/examples/external-api-request/golem_sign.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIIB/zCCAYWgAwIBAgIUL+slWOaCypC0+VZM96VMW3lN3RkwCgYIKoZIzj0EAwIw +ODEWMBQGA1UECgwNR29sZW0gRmFjdG9yeTEeMBwGA1UEAwwVR29sZW0gRmFjdG9y +eSBST09UIENBMB4XDTIyMTEyMzExNDA1M1oXDTI1MTEyMjExNDA1M1owQDEWMBQG +A1UECgwNR29sZW0gRmFjdG9yeTEmMCQGA1UEAwwdR29sZW0gRmFjdG9yeSBJTlRF +Uk1FRElBVEUgQ0EwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASSFvpQd9Q0hE33 +VcdWXx57BNYfxL82IechAhcOFZMfuxlUIh3scIrvVu7IKK/5WZEeMw3SlU7zQrHf +xDQhW7FZo2UwYzAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0TAQH/BAgwBgEB/wIBATA9 +BgNVHR8BAf8EMzAxMC+gLaArhilodHRwczovL2NhLmdvbGVtLm5ldHdvcmsvY3Js +L0dPTEVNLUNBLmNybDAKBggqhkjOPQQDAgNoADBlAjAMz6iyHqudwY0WwfOGSEWu +QtJ+/x0EztMzHVKIc1sFrNFhKTm1rNozlX1k5dGaMZ0CMQDrCcCBs9XiVCjRIjsG +UqSjwyXJprSAQn4g3kFNOLCHrzOo/5IS9MZUH0lnekbaa1o= +-----END CERTIFICATE----- diff --git a/examples/external-api-request/manifest.json.base64.sha256.sig b/examples/external-api-request/manifest.json.base64.sha256.sig new file mode 100644 index 0000000000000000000000000000000000000000..f54173c184bdab33647ad39a622a7c343dabe67f GIT binary patch literal 70 zcmV-M0J;A#L;@f})#50&!v4+*{m^wh$6d?n_1aQv8vsdH`t1Mh+|7>yAacelSYUSP cN; manifest.json.base64 -openssl dgst -sha256 -sign $PRIVATE_KEY -out manifest.json.base64.sign.sha256 manifest.json.base64 +openssl dgst -sha256 -sign $PRIVATE_KEY -out manifest.json.base64.sha256.sig manifest.json.base64 From 157e11abdf3c20628df3a0b044098250f5842d52 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Mon, 28 Nov 2022 10:14:17 +0100 Subject: [PATCH 10/16] SocketProxy docs --- .gitignore | 2 + docs/sphinx/api.rst | 16 +++-- yapapi/contrib/service/socket_proxy.py | 95 +++++++++++++++++++++++++- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index e8958b9d1..60171ce6f 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ poetry.lock # local config files .pre-commit-config.yaml + +/build diff --git a/docs/sphinx/api.rst b/docs/sphinx/api.rst index b47cc5e97..25fb078e5 100644 --- a/docs/sphinx/api.rst +++ b/docs/sphinx/api.rst @@ -151,10 +151,6 @@ Yapapi Contrib .. automodule:: yapapi.contrib -.. automodule:: yapapi.contrib.strategy.provider_filter - -.. autoclass:: yapapi.contrib.strategy.ProviderFilter - .. automodule:: yapapi.contrib.service.http_proxy .. autoclass:: yapapi.contrib.service.http_proxy.LocalHttpProxy @@ -162,3 +158,15 @@ Yapapi Contrib .. autoclass:: yapapi.contrib.service.http_proxy.HttpProxyService :members: __init__, handle_request + +.. automodule:: yapapi.contrib.service.socket_proxy + +.. autoclass:: yapapi.contrib.service.socket_proxy.SocketProxy + :members: __init__, run, run_server, stop + +.. autoclass:: yapapi.contrib.service.socket_proxy.SocketProxyService + :members: remote_ports + +.. automodule:: yapapi.contrib.strategy.provider_filter + +.. autoclass:: yapapi.contrib.strategy.ProviderFilter diff --git a/yapapi/contrib/service/socket_proxy.py b/yapapi/contrib/service/socket_proxy.py index 7f16c329f..f33362431 100644 --- a/yapapi/contrib/service/socket_proxy.py +++ b/yapapi/contrib/service/socket_proxy.py @@ -1,3 +1,14 @@ +""" +TCP socket proxy +^^^^^^^^^^^^^^^^ + +A local proxy that facilitates connections to any VPN-enabled TCP services launched on Golem +providers using yapapi's Services API. + +For usage in a complete requestor agent app, see the +`ssh `_ example in the +yapapi repository. +""" import abc import aiohttp import asyncio @@ -29,6 +40,7 @@ class SocketProxyService(Service, abc.ABC): """ remote_ports: List[int] + """List of remote ports to open the proxies for when using `SocketProxy.`:meth:`~SocketProxy.run`.""" class ProxyConnection: @@ -187,12 +199,14 @@ def __repr__(self): @property def app_key(self): + """The application key used to authorize access to `yagna`'s REST API.""" return ( self.service.cluster.service_runner._job.engine._api_config.app_key # type: ignore[union-attr] # noqa ) @property def instance_ws(self): + """The websocket URI for the specific remote port of the Service.""" return self.service.network_node.get_websocket_uri(self.remote_port) # type: ignore[union-attr] # noqa async def handler( @@ -204,6 +218,10 @@ async def handler( await connection.run() async def run(self): + """Run the asyncio TCP server for a single port on a Service. + + Preferred usage is through `:meth:`~SocketProxy.run_server`. + """ server = await asyncio.start_server(self.handler, self.local_address, self.local_port) addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets) # type: ignore # noqa logger.info("Listening on: %s, forwarding to: %s", addrs, self.instance_ws) @@ -223,6 +241,64 @@ class SocketProxy: The connections can be routed to instances of services connected to a Golem VPN using `yapapi`'s Network API (:meth:`~yapapi.Golem.create_network`). + + Example usage:: + + class SomeService(SocketProxyService): + def __init__(self, remote_port: int = 4242): + super().init() + self.remote_ports = [remote_port] + + ... + + cluster = await golem.run_service( + SomeService, + network=network, + ) + + # ensure services are started + + ... + + proxy = SocketProxy(ports=[8484]) + await proxy.run(cluster) + + ... # connections to local port 8484 will be routed to port 4242 within the VM + + await proxy.stop() + cluster.stop() + + + Example usage directly from a Service handler:: + + class SomeOtherService(SocketProxyService): + remote_port = 22 # e.g. an SSH daemon + + def __init__(self, proxy: SocketProxy): + super().__init__() + self.proxy = proxy + + async def start(self): + # perform the initialization of the Service + + ... + + server = await self.proxy.run_server(self, self.remote_port) + + + proxy = SocketProxy(ports=[2222]) + + cluster = await golem.run_service( + SomeOtherService, + network=network, + instance_params=[{"proxy": proxy}], + ) + + ... # connections to local port 2222 will be routed to port 22 within the VM + + await proxy.stop() + cluster.stop() + """ servers: Dict[Service, Dict[int, ProxyServer]] @@ -234,6 +310,13 @@ def __init__( buffer_size: int = DEFAULT_SOCKET_BUFFER_SIZE, timeout: float = DEFAULT_TIMEOUT, ): + """Initialize the TCP socket proxy service. + + :param ports: a list of local ports that will be assigned to consecutive connections + :param address: the IP address to bind the local server to + :param buffer_size: the size of the data buffer used for the connections + :param timeout: the timeout in seconds for the response of the remote end + """ self.ports = ports self.address = address self.buffer_size = buffer_size @@ -244,7 +327,11 @@ def __init__( self._tasks: List[asyncio.Task] = list() async def run_server(self, service: Service, remote_port: int): - """Run a socket proxy for a single instance of a service.""" + """Run a socket proxy for a single port on a instance of a service. + + :param service: the service instance + :param remote_port: the remote port on which a TCP service is listening on the remote end + """ assert service.network_node, "Service must be started on a VPN." local_port = self._available_ports.pop(0) @@ -271,12 +358,16 @@ def get_server(self, service: Service, remote_port: int): return self.servers[service][remote_port] async def run(self, cluster: Cluster[SocketProxyService]): - """Run the proxy servers for all ports on a cluster.""" + """Run the proxy servers for all ports on a cluster. + + :param cluster: the cluster for which the proxy connections should be enabled. + """ for service in cluster.instances: for remote_port in service.remote_ports: await self.run_server(service, remote_port) async def stop(self): + """Stop servers for all connections.""" logger.info("Stopping socket proxy...") for t in self._tasks: t.cancel() From 1ce7ee34580bdaca6bedc63d5e06095b132321d7 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Mon, 28 Nov 2022 10:21:57 +0100 Subject: [PATCH 11/16] include the updated Service.restart docs in the API reference --- docs/sphinx/api.rst | 2 +- yapapi/services/service.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/sphinx/api.rst b/docs/sphinx/api.rst index 25fb078e5..6ba2b431a 100644 --- a/docs/sphinx/api.rst +++ b/docs/sphinx/api.rst @@ -27,7 +27,7 @@ Service ------- .. autoclass:: yapapi.services.Service - :members: id, provider_name, state, is_available, start, run, shutdown, reset, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node + :members: id, provider_name, state, is_available, start, run, shutdown, reset, is_activity_responsive, restart_condition, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node Cluster ------- diff --git a/yapapi/services/service.py b/yapapi/services/service.py index 13a792cd5..943397268 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -409,6 +409,9 @@ async def is_activity_responsive(self) -> bool: Tries to get the state activity. Returns True if the activity state could be queried successfully and false otherwise. + + Can be overridden in case the specific implementation of `Service` wants to implement + a more appropriate health check of the particular service. """ if not self._ctx: return False From 779b1b3455a1c8ca3deb33104ac1032fa90bc920 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 29 Nov 2022 09:04:11 +0100 Subject: [PATCH 12/16] temporarily disable the ssh connection check... :( --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c6938230c..775c513ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,8 @@ ya-market = "0.1.0" [tool.poe.tasks] test = "pytest --cov=yapapi --ignore tests/goth_tests" goth-assets = "python -m goth create-assets tests/goth_tests/assets" -goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +#goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" +goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError" typecheck = "mypy --check-untyped-defs --no-implicit-optional ." _codestyle_isort = "isort --check-only --diff ." _codestyle_black = "black --check --diff ." From c6c8170d77742b547ff9d7cff1b0bb23c06fe0f7 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 29 Nov 2022 10:52:24 +0100 Subject: [PATCH 13/16] increase the script timeout in the `webapp` example... --- examples/webapp/webapp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/webapp/webapp.py b/examples/webapp/webapp.py index 277d35646..ad08af7ab 100755 --- a/examples/webapp/webapp.py +++ b/examples/webapp/webapp.py @@ -45,7 +45,7 @@ async def start(self): async for script in super().start(): yield script - script = self._ctx.new_script(timeout=timedelta(seconds=10)) + script = self._ctx.new_script(timeout=timedelta(seconds=20)) script.run( "/bin/bash", From 9f9b5e89ed8195fa5aae2cd48fb04c78db9f806d Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Tue, 29 Nov 2022 13:18:29 +0100 Subject: [PATCH 14/16] disabling the ssh test for now --- tests/goth_tests/test_run_ssh.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/goth_tests/test_run_ssh.py b/tests/goth_tests/test_run_ssh.py index 771024ac2..ad68aa8aa 100644 --- a/tests/goth_tests/test_run_ssh.py +++ b/tests/goth_tests/test_run_ssh.py @@ -22,6 +22,7 @@ SUBNET_TAG = "goth" +@pytest.mark.skip # TODO: we probably need to fix the way the requestor process are spawned #1020 # noqa @pytest.mark.asyncio async def test_run_ssh( log_dir: Path, From 79d61521946ec484420b8dc5b8a6d45a6a1c2113 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 30 Nov 2022 13:36:21 +0100 Subject: [PATCH 15/16] add `vm.manifest` to the documentation --- docs/sphinx/api.rst | 4 ++-- yapapi/payload/vm.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/sphinx/api.rst b/docs/sphinx/api.rst index b47cc5e97..5182c266b 100644 --- a/docs/sphinx/api.rst +++ b/docs/sphinx/api.rst @@ -75,11 +75,11 @@ Package .. autoclass:: yapapi.payload.package.Package -vm.repo +vm ------- .. automodule:: yapapi.payload.vm - :members: repo + :members: repo, manifest Execution control diff --git a/yapapi/payload/vm.py b/yapapi/payload/vm.py index bae8c0711..45588aaf6 100644 --- a/yapapi/payload/vm.py +++ b/yapapi/payload/vm.py @@ -122,9 +122,9 @@ async def manifest( Build a reference to application payload. :param manifest: base64 encoded Computation Payload Manifest https://handbook.golem.network/requestor-tutorials/vm-runtime/computation-payload-manifest - :param manifest_sig: an optional signature of of base64 encoded Computation Payload Manifest - :manifest_sig_algorithm: an optional signature algorithm, e.g. "sha256" - :manifest_cert: an optional base64 encoded public certificate (DER or PEM) matching key used to generate signature + :param manifest_sig: an optional signature of base64 encoded Computation Payload Manifest + :param manifest_sig_algorithm: an optional signature algorithm, e.g. "sha256" + :param manifest_cert: an optional base64 encoded public certificate (DER or PEM) matching key used to generate signature :param min_mem_gib: minimal memory required to execute application code :param min_storage_gib: minimal disk storage to execute tasks :param min_cpu_threads: minimal available logical CPU cores From a8868e18b9382751b71bb435b79fa399c240e6f1 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Fri, 2 Dec 2022 12:30:00 +0100 Subject: [PATCH 16/16] set the release version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 775c513ef..05ce7679c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "yapapi" -version = "0.10.0-alpha.1" +version = "0.10.0" description = "High-level Python API for the New Golem" authors = ["Przemysław K. Rekucki ", "GolemFactory "] license = "LGPL-3.0-or-later"