Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First attempt at removing trip & updating hazmat -> lowlevel #127

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ matrix:
- name: "Python 3.7: multiprocessing"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.7: trio-run-in-process"
- name: "Python 3.7: trio"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process"
env: SPAWN_BACKEND="trio"

- name: "Python 3.8: multiprocessing"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.8: trio-run-in-process"
- name: "Python 3.8: trio"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process"
env: SPAWN_BACKEND="trio"

install:
- cd $TRAVIS_BUILD_DIR
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
],
install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing', 'trio-run-in-process',
'trio_typing', 'cloudpickle',
],
tests_require=['pytest'],
python_requires=">=3.7",
Expand Down
10 changes: 5 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def pytest_addoption(parser):

parser.addoption(
"--spawn-backend", action="store", dest='spawn_backend',
default='trio_run_in_process',
default='trio',
help="Processing spawning backend to use for test run",
)

Expand All @@ -34,7 +34,7 @@ def pytest_configure(config):

if backend == 'mp':
tractor._spawn.try_set_start_method('spawn')
elif backend == 'trio_run_in_process':
elif backend == 'trio':
tractor._spawn.try_set_start_method(backend)


Expand All @@ -56,7 +56,7 @@ def pytest_generate_tests(metafunc):
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'mp'
assert spawn_backend in ('mp', 'trio_run_in_process')
assert spawn_backend in ('mp', 'trio')

if 'start_method' in metafunc.fixturenames:
if spawn_backend == 'mp':
Expand All @@ -67,11 +67,11 @@ def pytest_generate_tests(metafunc):
# removing XXX: the fork method is in general
# incompatible with trio's global scheduler state
methods.remove('fork')
elif spawn_backend == 'trio_run_in_process':
elif spawn_backend == 'trio':
if platform.system() == "Windows":
pytest.fail(
"Only `--spawn-backend=mp` is supported on Windows")

methods = ['trio_run_in_process']
methods = ['trio']

metafunc.parametrize("start_method", methods, scope='module')
2 changes: 1 addition & 1 deletion tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method):
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.

Expand Down
2 changes: 1 addition & 1 deletion tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def load_modules(self) -> None:
code (if it exists).
"""
try:
if self._spawn_method == 'trio_run_in_process':
if self._spawn_method == 'trio':
parent_data = self._parent_main_data
if 'init_main_from_name' in parent_data:
_mp_fixup_main._fixup_main_from_name(
Expand Down
13 changes: 13 additions & 0 deletions tractor/_child.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import sys
import trio
import cloudpickle

if __name__ == "__main__":
job = cloudpickle.load(sys.stdin.detach())

try:
result = trio.run(job)
cloudpickle.dump(sys.stdout.detach(), result)

except BaseException as err:
cloudpickle.dump(sys.stdout.detach(), err)
6 changes: 4 additions & 2 deletions tractor/_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated")


async def _trip_main(
async def _trio_main(
actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
Expand All @@ -72,7 +72,9 @@ async def _trip_main(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)

log.info(f"Started new TRIP process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")

_state._current_actor = actor

await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated")
68 changes: 50 additions & 18 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""
Machinery for actor process spawning using multiple backends.
"""
import sys
import inspect
import subprocess
import multiprocessing as mp
import platform
from typing import Any, Dict, Optional
from functools import partial

import trio
import cloudpickle
from trio_typing import TaskStatus
from async_generator import aclosing
from async_generator import aclosing, asynccontextmanager

try:
from multiprocessing import semaphore_tracker # type: ignore
Expand All @@ -21,12 +25,12 @@
from multiprocessing import forkserver # type: ignore
from typing import Tuple

from . import _forkserver_override
from . import _forkserver_override, _child
from ._state import current_actor
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trip_main
from ._entry import _mp_main, _trio_main


log = get_logger('tractor')
Expand All @@ -41,14 +45,13 @@
_ctx = mp.get_context("spawn")

async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
await trio.lowlevel.WaitForSingleObject(proc.sentinel)
else:
# *NIX systems use ``trio_run_in_process` as our default (for now)
import trio_run_in_process
_spawn_method = "trio_run_in_process"
# *NIX systems use ``trio`` primitives as our default
_spawn_method = "trio"

async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
await trio.lowlevel.wait_readable(proc.sentinel)


def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
Expand All @@ -57,7 +60,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:

If the desired method is not supported this function will error. On
Windows the only supported option is the ``multiprocessing`` "spawn"
method. The default on *nix systems is ``trio_run_in_process``.
method. The default on *nix systems is ``trio``.
"""
global _ctx
global _spawn_method
Expand All @@ -69,7 +72,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:

# no Windows support for trip yet
if platform.system() != 'Windows':
methods += ['trio_run_in_process']
methods += ['trio']

if name not in methods:
raise ValueError(
Expand All @@ -78,7 +81,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
elif name == 'forkserver':
_forkserver_override.override_stdlib()
_ctx = mp.get_context(name)
elif name == 'trio_run_in_process':
elif name == 'trio':
_ctx = None
else:
_ctx = mp.get_context(name)
Expand Down Expand Up @@ -153,6 +156,38 @@ async def cancel_on_completion(
await portal.cancel_actor()


@asynccontextmanager
async def run_in_process(async_fn, *args, **kwargs):
Copy link
Owner

@goodboy goodboy Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this simpler why don't we just make the signature of run_in_proc() something like:

async def run_in_process(subactor, async_fn, *args, **kwargs):
    debug_args = list(subactor.uid)
    ...

I do think it would be neat to accept these over sys.argv in _child.py and verify just as a sanity check.
It will for sure help with debugging using pstree (or whatever).

Heck, maybe it actually makes more sense to do this without passing the Actor type at all?
In theory we can avoid passing any complex types whatsoever as long as Actor._async_main() receives the appropriate data that Actor.__init__() normally does, 🤔.

Not sure if that's overthinking it.
I wonder if that means we could avoid cloudpickle entirely?

We can maybe just use msgpack in that case and load the module / func just like Actor._get_rpc_fun() does (note it requires .load_modules() to have been run first).

Copy link
Owner

@goodboy goodboy Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at the least in terms of dropping cloudpickle we should make an issue and tackle it later.
The multiprocessing.Process call will need to be re-configured for this as well.

# get arguments that are actors
actors_in_args = [
arg for arg in args if isinstance(arg, Actor)
]
debug_args = []
# assume the first is the actor running the function
if len(actors_in_args) > 0:
subactor = actors_in_args[0]
debug_args.append(subactor.uid[0])
debug_args.append(subactor.uid[1])

encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
p = await trio.open_process(
[
sys.executable,
"-m",
_child.__name__
] + debug_args,
stdin=subprocess.PIPE
)

# send over func to call
await p.stdin.send_all(encoded_job)

yield p

# wait for termination
await p.wait()


async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore
Expand All @@ -174,12 +209,9 @@ async def new_proc(
subactor._spawn_method = _spawn_method

async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process':
if infect_asyncio:
raise NotImplementedError("Asyncio is incompatible with trip")
# trio_run_in_process
async with trio_run_in_process.open_in_process(
_trip_main,
if use_trio_run_in_process or _spawn_method == 'trio':
async with run_in_process(
_trio_main,
subactor,
bind_addr,
parent_addr,
Expand All @@ -203,7 +235,7 @@ async def new_proc(
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)

# TRIP blocks here until process is complete
# run_in_process blocks here until process is complete
else:
# `multiprocessing`
assert _ctx
Expand Down
2 changes: 1 addition & 1 deletion tractor/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __iter__(self):
def __getitem__(self, key: str):
try:
return {
'task': trio.hazmat.current_task,
'task': trio.lowlevel.current_task,
'actor': current_actor
}[key]().name
except RuntimeError:
Expand Down
2 changes: 1 addition & 1 deletion tractor/testing/_tractor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def wrapper(
if platform.system() == "Windows":
start_method = 'spawn'
else:
start_method = 'trio_run_in_process'
start_method = 'trio'

if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
Expand Down