Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recycle network IPs #1054

Merged
merged 3 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ ya-market = "0.1.0"
[tool.poe.tasks]
test = "pytest --cov=yapapi --ignore tests/goth_tests"
goth-assets = "python -m goth create-assets tests/goth_tests/assets"
goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.release-tag=pre-rel-v0.12.0-rc2 --config-path tests/goth_tests/assets/goth-config.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError"
goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError"
typecheck = "mypy --check-untyped-defs --no-implicit-optional ."
_codestyle_isort = "isort --check-only --diff ."
_codestyle_black = "black --check --diff ."
Expand Down
99 changes: 99 additions & 0 deletions tests/goth_tests/test_recycle_ip/ssh_recycle_ip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3
import asyncio
from datetime import timedelta
import random
import string

from yapapi import Golem
from yapapi.contrib.service.socket_proxy import SocketProxy, SocketProxyService
from yapapi.payload import vm

first_time = True


class SshService(SocketProxyService):
remote_port = 22

def __init__(self, proxy: SocketProxy):
super().__init__()
self.proxy = proxy

@staticmethod
async def get_payload():
return await vm.repo(
image_hash="1e06505997e8bd1b9e1a00bd10d255fc6a390905e4d6840a22a79902", # ssh example
capabilities=[vm.VM_CAPS_VPN],
)

async def start(self):
global first_time

async for script in super().start():
yield script

password = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8))

script = self._ctx.new_script(timeout=timedelta(seconds=10))

if first_time:
first_time = False
raise Exception("intentional failure on the first run")

script.run("/bin/bash", "-c", "syslogd")
script.run("/bin/bash", "-c", "ssh-keygen -A")
script.run("/bin/bash", "-c", f'echo -e "{password}\n{password}" | passwd')
script.run("/bin/bash", "-c", "/usr/sbin/sshd")
yield script

server = await self.proxy.run_server(self, self.remote_port)

print(
f"connect with:\n"
f"ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no "
f"-p {server.local_port} root@{server.local_address}"
)
print(f"password: {password}")


async def main():
async with Golem(
budget=1.0,
subnet_tag="goth",
) as golem:
network = await golem.create_network("192.168.0.1/24")
proxy = SocketProxy(ports=[2222])

async with network:
cluster = await golem.run_service(
SshService,
network=network,
instance_params=[{"proxy": proxy}],
network_addresses=["192.168.0.2"],
)
instances = cluster.instances

while True:
print(instances)
print(".....", network._nodes)
try:
await asyncio.sleep(5)
except (KeyboardInterrupt, asyncio.CancelledError):
break

await proxy.stop()
cluster.stop()

cnt = 0
while cnt < 3 and any(s.is_available for s in instances):
print(instances)
await asyncio.sleep(5)
cnt += 1


if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
except KeyboardInterrupt:
pass
117 changes: 117 additions & 0 deletions tests/goth_tests/test_recycle_ip/test_recycle_ip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""An integration test scenario that runs the SSH example requestor app."""
import asyncio
import logging
import os
from pathlib import Path
import pexpect
import pytest
import re
import signal
import sys
import time
from typing import List

from goth.configuration import Override, load_yaml
from goth.runner import Runner
from goth.runner.log import configure_logging
from goth.runner.probe import RequestorProbe

from goth_tests.assertions import assert_all_invoices_accepted, assert_no_errors # isort:skip

logger = logging.getLogger("goth.test.recycle_ips")

SUBNET_TAG = "goth"


@pytest.mark.asyncio
async def test_recycle_ip(
log_dir: Path,
project_dir: Path,
goth_config_path: Path,
config_overrides: List[Override],
ssh_verify_connection: bool,
) -> None:

if ssh_verify_connection:
ssh_check = pexpect.spawn("/usr/bin/which ssh")
exit_code = ssh_check.wait()
if exit_code != 0:
raise ProcessLookupError(
"ssh binary not found, please install it or check your PATH. "
"You may also skip the connection check by omitting `--ssh-verify-connection`."
)

configure_logging(log_dir)

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(goth_config_path, config_overrides)

# disable the mitm proxy used to capture the requestor agent -> daemon API calls
# because it doesn't support websockets which are neeeded to enable VPN connections
requestor = [c for c in goth_config.containers if c.name == "requestor"][0]
requestor.use_proxy = False

requestor_path = str(Path(__file__).parent / "ssh_recycle_ip.py")

runner = Runner(
base_log_dir=log_dir,
compose_config=goth_config.compose_config,
)

async with runner(goth_config.containers):

requestor = runner.get_probes(probe_type=RequestorProbe)[0]

async with requestor.run_command_on_host(
f"{requestor_path} --subnet-tag {SUBNET_TAG}",
env=os.environ,
) as (_cmd_task, cmd_monitor, process_monitor):
cmd_monitor.add_assertion(assert_no_errors)
cmd_monitor.add_assertion(assert_all_invoices_accepted)

ssh_connections = []

# A longer timeout to account for downloading a VM image
ssh_string = await cmd_monitor.wait_for_pattern("ssh -o", timeout=120)
matches = re.match("ssh -o .* -p (\\d+) (root@.*)", ssh_string)
port = matches.group(1)
address = matches.group(2)
password = re.sub("password: ", "", await cmd_monitor.wait_for_pattern("password:"))

ssh_connections.append((port, address, password))

await cmd_monitor.wait_for_pattern(".*SshService running on provider", timeout=10)
logger.info("SSH service instances started")

if not ssh_verify_connection:
logger.warning(
"Skipping SSH connection check. Use `--ssh-verify-connection` to perform it."
)
else:
for port, address, password in ssh_connections:
args = [
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
"-p",
port,
address,
"uname -v",
]

logger.debug("running ssh with: %s", args)

ssh = pexpect.spawn(" ".join(args))
ssh.expect("[pP]assword:", timeout=5)
ssh.sendline(password)
ssh.expect("#1-Alpine SMP", timeout=5)
ssh.expect(pexpect.EOF, timeout=5)
logger.info("Connection to port %s confirmed.", port)

logger.info("SSH connections confirmed.")

proc: asyncio.subprocess.Process = await process_monitor.get_process()
proc.send_signal(signal.SIGINT)
logger.info("Sent SIGINT...")
11 changes: 11 additions & 0 deletions yapapi/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,17 @@ async def add_node(self, node_id: str, ip: Optional[str] = None) -> Node:

return node

async def remove_node(self, node_id: str) -> None:
"""Remove the node from the network.

:param node_id: Node ID within the Golem network of the VPN node to be removed.
"""

await self._net_api.remove_node(self.network_id, node_id)

async with self._nodes_lock:
del self._nodes[node_id]

async def _refresh_node(self, node: Node):
logger.debug("refreshing node %s", node)
try:
Expand Down
3 changes: 3 additions & 0 deletions yapapi/rest/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ async def add_address(self, network_id: str, ip: str):

async def add_node(self, network_id: str, node_id: str, ip: str):
await self._api.add_node(network_id, yan.Node(node_id, ip))

async def remove_node(self, network_id: str, node_id: str):
await self._api.remove_node(network_id, node_id)
3 changes: 3 additions & 0 deletions yapapi/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def _set_ctx(self, ctx: WorkContext) -> None:
def _set_network_node(self, node: Node) -> None:
self._network_node = node

def _clear_network_node(self) -> None:
self._network_node = None

def __repr__(self):
class_name = type(self).__name__
state = self.state.value
Expand Down
3 changes: 3 additions & 0 deletions yapapi/services/service_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,9 @@ async def _worker(work_context: WorkContext) -> None:
work_context.emit(events.WorkerFinished, exc_info=sys.exc_info())
raise
finally:
if network and service.network_node:
await network.remove_node(work_context.provider_id)
service._clear_network_node()
await self._job.engine.accept_payments_for_agreement(self._job.id, agreement.id)
await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False)

Expand Down