Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs for SocketProxy and Service.reset / Service.restart_condition #1061

Merged
merged 5 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ poetry.lock

# local config files
.pre-commit-config.yaml

/build
18 changes: 13 additions & 5 deletions docs/sphinx/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Service
-------

.. autoclass:: yapapi.services.Service
:members: id, provider_name, state, is_available, start, run, shutdown, reset, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node
:members: id, provider_name, state, is_available, start, run, shutdown, reset, is_activity_responsive, restart_condition, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node

Cluster
-------
Expand Down Expand Up @@ -151,14 +151,22 @@ Yapapi Contrib

.. automodule:: yapapi.contrib

.. automodule:: yapapi.contrib.strategy.provider_filter

.. autoclass:: yapapi.contrib.strategy.ProviderFilter

.. automodule:: yapapi.contrib.service.http_proxy

.. autoclass:: yapapi.contrib.service.http_proxy.LocalHttpProxy
:members: __init__, run, stop

.. autoclass:: yapapi.contrib.service.http_proxy.HttpProxyService
:members: __init__, handle_request

.. automodule:: yapapi.contrib.service.socket_proxy

.. autoclass:: yapapi.contrib.service.socket_proxy.SocketProxy
:members: __init__, run, run_server, stop

.. autoclass:: yapapi.contrib.service.socket_proxy.SocketProxyService
:members: remote_ports

.. automodule:: yapapi.contrib.strategy.provider_filter

.. autoclass:: yapapi.contrib.strategy.ProviderFilter
2 changes: 1 addition & 1 deletion examples/webapp/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def start(self):
async for script in super().start():
yield script

script = self._ctx.new_script(timeout=timedelta(seconds=10))
script = self._ctx.new_script(timeout=timedelta(seconds=20))

script.run(
"/bin/bash",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ ya-market = "0.1.0"
[tool.poe.tasks]
test = "pytest --cov=yapapi --ignore tests/goth_tests"
goth-assets = "python -m goth create-assets tests/goth_tests/assets"
goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError"
#goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --ssh-verify-connection --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError"
goth-tests = "pytest -svx tests/goth_tests --config-override docker-compose.build-environment.use-prerelease=true --config-path tests/goth_tests/assets/goth-config-testing.yml --reruns 3 --only-rerun AssertionError --only-rerun TimeoutError --only-rerun goth.runner.exceptions.TemporalAssertionError --only-rerun urllib.error.URLError --only-rerun goth.runner.exceptions.CommandError"
typecheck = "mypy --check-untyped-defs --no-implicit-optional ."
_codestyle_isort = "isort --check-only --diff ."
_codestyle_black = "black --check --diff ."
Expand Down
1 change: 1 addition & 0 deletions tests/goth_tests/test_run_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SUBNET_TAG = "goth"


@pytest.mark.skip # TODO: we probably need to fix the way the requestor process are spawned #1020 # noqa
@pytest.mark.asyncio
async def test_run_ssh(
log_dir: Path,
Expand Down
95 changes: 93 additions & 2 deletions yapapi/contrib/service/socket_proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""
TCP socket proxy
^^^^^^^^^^^^^^^^

A local proxy that facilitates connections to any VPN-enabled TCP services launched on Golem
providers using yapapi's Services API.

For usage in a complete requestor agent app, see the
`ssh <https://github.com/golemfactory/yapapi/tree/master/examples/ssh>`_ example in the
yapapi repository.
"""
import abc
import aiohttp
import asyncio
Expand Down Expand Up @@ -29,6 +40,7 @@ class SocketProxyService(Service, abc.ABC):
"""

remote_ports: List[int]
"""List of remote ports to open the proxies for when using `SocketProxy.`:meth:`~SocketProxy.run`."""


class ProxyConnection:
Expand Down Expand Up @@ -187,12 +199,14 @@ def __repr__(self):

@property
def app_key(self):
"""The application key used to authorize access to `yagna`'s REST API."""
return (
self.service.cluster.service_runner._job.engine._api_config.app_key # type: ignore[union-attr] # noqa
)

@property
def instance_ws(self):
"""The websocket URI for the specific remote port of the Service."""
return self.service.network_node.get_websocket_uri(self.remote_port) # type: ignore[union-attr] # noqa

async def handler(
Expand All @@ -204,6 +218,10 @@ async def handler(
await connection.run()

async def run(self):
"""Run the asyncio TCP server for a single port on a Service.

Preferred usage is through `:meth:`~SocketProxy.run_server`.
"""
server = await asyncio.start_server(self.handler, self.local_address, self.local_port)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets) # type: ignore # noqa
logger.info("Listening on: %s, forwarding to: %s", addrs, self.instance_ws)
Expand All @@ -223,6 +241,64 @@ class SocketProxy:

The connections can be routed to instances of services connected to a Golem VPN
using `yapapi`'s Network API (:meth:`~yapapi.Golem.create_network`).

Example usage::

class SomeService(SocketProxyService):
def __init__(self, remote_port: int = 4242):
super().init()
self.remote_ports = [remote_port]

...

cluster = await golem.run_service(
SomeService,
network=network,
)

# ensure services are started

...

proxy = SocketProxy(ports=[8484])
await proxy.run(cluster)

... # connections to local port 8484 will be routed to port 4242 within the VM

await proxy.stop()
cluster.stop()


Example usage directly from a Service handler::

class SomeOtherService(SocketProxyService):
remote_port = 22 # e.g. an SSH daemon

def __init__(self, proxy: SocketProxy):
super().__init__()
self.proxy = proxy

async def start(self):
# perform the initialization of the Service

...

server = await self.proxy.run_server(self, self.remote_port)


proxy = SocketProxy(ports=[2222])

cluster = await golem.run_service(
SomeOtherService,
network=network,
instance_params=[{"proxy": proxy}],
)

... # connections to local port 2222 will be routed to port 22 within the VM

await proxy.stop()
cluster.stop()

"""

servers: Dict[Service, Dict[int, ProxyServer]]
Expand All @@ -234,6 +310,13 @@ def __init__(
buffer_size: int = DEFAULT_SOCKET_BUFFER_SIZE,
timeout: float = DEFAULT_TIMEOUT,
):
"""Initialize the TCP socket proxy service.

:param ports: a list of local ports that will be assigned to consecutive connections
:param address: the IP address to bind the local server to
:param buffer_size: the size of the data buffer used for the connections
:param timeout: the timeout in seconds for the response of the remote end
"""
self.ports = ports
self.address = address
self.buffer_size = buffer_size
Expand All @@ -244,7 +327,11 @@ def __init__(
self._tasks: List[asyncio.Task] = list()

async def run_server(self, service: Service, remote_port: int):
"""Run a socket proxy for a single instance of a service."""
"""Run a socket proxy for a single port on a instance of a service.

:param service: the service instance
:param remote_port: the remote port on which a TCP service is listening on the remote end
"""
assert service.network_node, "Service must be started on a VPN."

local_port = self._available_ports.pop(0)
Expand All @@ -271,12 +358,16 @@ def get_server(self, service: Service, remote_port: int):
return self.servers[service][remote_port]

async def run(self, cluster: Cluster[SocketProxyService]):
"""Run the proxy servers for all ports on a cluster."""
"""Run the proxy servers for all ports on a cluster.

:param cluster: the cluster for which the proxy connections should be enabled.
"""
for service in cluster.instances:
for remote_port in service.remote_ports:
await self.run_server(service, remote_port)

async def stop(self):
"""Stop servers for all connections."""
logger.info("Stopping socket proxy...")
for t in self._tasks:
t.cancel()
Expand Down
3 changes: 3 additions & 0 deletions yapapi/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ async def is_activity_responsive(self) -> bool:

Tries to get the state activity. Returns True if the activity state could be
queried successfully and false otherwise.

Can be overridden in case the specific implementation of `Service` wants to implement
a more appropriate health check of the particular service.
"""
if not self._ctx:
return False
Expand Down