-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remote multithreading stalls client ... and a pitch #491
Comments
I just finished a prototype at notEvil@46b49e3
|
The prototype has no issues with the following script. Current rpyc, as expected, stalls some threads yielding expired results and is much slower due to single threaded operation. Hope this increases the chance of consideration :) (not complaining!) import rpyc
import logging
import random
import threading
import time
SLEEPLESS = False
PORT = 18812
class Service(rpyc.Service):
def __init__(self):
super().__init__()
self._remote_function = None
def on_connect(self, connection):
self._remote_function = connection.root.function
for _ in range(10):
threading.Thread(target=self._worker, args=(connection,)).start()
def _worker(self, connection):
for _ in range(2000 if SLEEPLESS else 100):
if not SLEEPLESS:
logging.info(repr(_))
self._call()
def exposed_function(self, complexity):
if random.random() < 0.292893219: # (1 - _)**2 == 0.5
self._call()
if SLEEPLESS:
if random.random() < 0.01:
time.sleep(0.01)
else:
time.sleep(complexity)
if random.random() < 0.292893219:
self._call()
return complexity
def _call(self):
complexity = 0 if random.random() < 0.5 else random.uniform(0, 1)
while self._remote_function is None:
pass
assert self._remote_function(complexity) == complexity
server_thread = threading.Thread(
target=rpyc.ThreadedServer(Service, port=PORT).start, daemon=True
)
server_thread.start()
connection = rpyc.connect("localhost", PORT, service=Service) |
I'll see if I can review this soon. I just pushed out a new unit test named |
Great, #492 is not really related to this issue (though the prototype doesn't need the fix by design) so I'll make it short: still think there is no way around extending the lock to |
Oops, I was connecting dots that are not there. Let us refocus... the title of this issue: "remote multithreading stalls client." The claim in the title is inaccurate. The remote multithreading does not stall client. In fact, your implementation sleeps on the server thread that serves the client. The client is invoking using a synchronous request. The client waiting for a response when doing a synchronous requests is expected behavior. I rewrote your client/server example to better illustrate what happens and here is the output
client.py import rpyc
import threading
import time
def async_example(connection):
t0 = time.time()
print(f"Running async example...")
_async_function = rpyc.async_(connection.root.function)
res = _async_function(threading.Event())
print(f"Created async result after {time.time()-t0}s")
value = res.value
print(f"Value returned after {time.time()-t0}s: {value}")
print()
def synchronous_example(connection):
t0 = time.time()
print(f"Running synchronous example...")
value = connection.root.function(threading.Event())
print(f"Value returned after {time.time()-t0}s: {value}")
print()
if __name__ == "__main__":
connection = rpyc.connect("localhost", 18812, config=dict(allow_public_attrs=True))
async_example(connection)
synchronous_example(connection) server.py import rpyc
import threading
import time
class Service(rpyc.Service):
def exposed_function(self, event):
threading.Thread(target=event.wait).start()
time.sleep(1)
threading.Thread(target=event.set).start()
return 'silly sleeps on server threads'
if __name__ == "__main__":
rpyc.ThreadedServer(Service(), hostname="localhost", port=18812).start() I hope this helps clarify the behavior you are seeing. |
I'm a little puzzled by the fact that current master doesn't stall, but 7ea2d24 (a few commits back) as well as 5.1.0 does on my setup. Can you confirm? The example is rather simple and my goal was to let the server call update: |
I confirmed that master does not timeout but
Concurrency in action is rarely simple imo 😆
Let's take a step back and ask a few questions about what we are trying to accomplish.
|
Generally, this is more a theoretical problem and the example is completely artificial. In practice one can launch as many client and server threads as necessary to get the job done and hope the possibility of thread A receiving a result for thread B which is occupied with a complex request for thread C won't yield expired results. However, some usecases may require thread-local execution and benefit from not having to worry about threading requirements and delays (thread B). Regarding the puzzle, I found that current master somehow doesn't process the example as I'd expect. I've changed my code to print before wait and its never reached. Server import rpyc
import threading
import time
class Service(rpyc.Service):
def exposed_function(self, c):
threading.Thread(target=c.wait).start()
time.sleep(1)
c.set()
rpyc.ThreadedServer(Service(), port=18812).start() Client import rpyc
import threading
class C:
def __init__(self):
self.event = threading.Event()
def wait(self):
print('waiting')
self.event.wait()
print('done waiting')
def set(self):
self.event.set()
connection = rpyc.connect("localhost", 18812, config=dict(allow_public_attrs=True))
connection.root.function(C()) |
Now let's rewrite your client a little bit such that the last few lines look like #connection.root.function(C())
_asyncfunc = rpyc.async_(connection.root.function)
c = C()
res = _asyncfunc(c)
print(f'c.event internal flag is {c.event.is_set()}')
print(f'We are and value is: {res.value}')
print(f'c.event internal flag is {c.event.is_set()}') The output is now
Why I changed what I did?
The code example you provided @notEvil is still expected behavior. Even so, there is likely room for improvement towards RPyC's "transparency" design—especially w/ respect to threading. Edit: strikethrough added to inaccurate information |
Thanks for confirming this.
Looking at |
I revised my comment to strikethrough inaccurate information (like the thread spawn bit as you pointed out). The The way your example was written results in a race condition, but not due to the RPyC protocol. If we patch the server to join the
The latest version of |
tl;dr RPyC netrefs treat threading as a second class concept. What does that mean? It means that when a proxy object is constructed it does not track the context that it running in. So, when a proxy object interacts with another address space there is no mechanism in place to control which thread that interaction happens under. |
Thanks! I've been using notEvil@46b49e3 for weeks without any issues. It would change a lot, maybe reduce performance due to bookkeeping, so I think if it were to be added to rpyc, it should be an optional feature!? Maybe add a bool/enum to the configuration dict? |
Could this be related to #475 ? |
@benjamin-kirkbride in short, no |
I would be okay with it as an optional feature. I would be interested in the benchmark differences. If you want to open a PR for it, we can work through a review of the changes. |
@notEvil do we have unit test coverage for notEvil@46b49e3 I greatly appreciate all of your recent contributions! Threading support has been on my radar but I've never been motivated to flesh it out—all of your time and contributions are a blessing. I'm going to do a release for RPyC (hopefully tomorrow). Then we can hopefully work on improving threading support. That way we don't have too many changes in a single release. |
Almost 100%. When I remove test_gevent_server from the suite, all* tests pass in a single run. With test_gevent_server there's a fatal Python error later in the run. I guess some threads are still alive and access resources (sockets) they shouldn't. *all except some for unrelated reasons, mostly ssh config
Sure Thanks for your support and continued efforts to maintain and improve rpyc. My contributions so far are little and few in comparison |
Finally some progress; I rethought the core idea and wrote it up: Multithreading in rpyc
Current implementation
Consequences
Proposed implementation
Advantages
Disadvantages
Reference implementation
This should be much easier to reason about than the commit. The reference implementation always spawns new threads, but can be adjusted to queue messages up instead. |
The "new" design is essentially the same as found in notEvil@46b49e3, but I ended up with much more concise code: notEvil@6f4edd5 (100% unit test coverage) Again, it replaces the current implementation instead of being optional, because the implications are still up for debate imo. For one, thread binding can lead to deadlocks (no thread receiving) and thread spawn behavior might be undesirable (e.g. initially client and server request simultaneously, causing 1 additional thread at both sides). If you want to move the discussion to a PR, I'll open one. |
Well ... its complicated. Consider the following import bokeh.layouts as b_layouts
import bokeh.plotting as b_plotting
import numpy
import pandas
import rpyc
import rpyc.utils.factory as ru_factory
import rpyc.utils.helpers as ru_helpers
import scipy.stats as s_stats
import argparse
import pathlib
import pickle
import time
import sys
SOCKET_PATH = "/tmp/socket"
class Service(rpyc.Service):
def __init__(self):
super().__init__()
self._remote_function = None
def on_connect(self, connection):
super().on_connect(connection)
self._remote_function = connection.root.function
# ru_helpers.BgServingThread(connection) # force second thread
def exposed_function(self, argument):
if argument == 0:
return 0
return self._remote_function(argument - 1)
if len(sys.argv) == 1: # server
path = pathlib.Path(SOCKET_PATH)
assert not path.exists()
try:
rpyc.OneShotServer(Service, socket_path=SOCKET_PATH).start()
finally:
path.unlink()
sys.exit(0)
# client
depth, candidate_path, reference_path, plot_path = sys.argv[1:]
depth = int(depth)
if 0 <= depth:
client = ru_factory.unix_connect(SOCKET_PATH, service=Service)
n = int(1e4)
remote_function = client.root.function
candidate_seconds = []
for count in range(n):
start_time = time.monotonic()
_ = remote_function(depth)
seconds = time.monotonic() - start_time
candidate_seconds.append(seconds)
client.close()
with open(candidate_path, "wb") as file:
pickle.dump(candidate_seconds, file)
else:
with open(candidate_path, "rb") as file:
candidate_seconds = pickle.load(file)
with open(reference_path, "rb") as file:
reference_seconds = pickle.load(file)
candidate_seconds = pandas.Series(candidate_seconds)
reference_seconds = pandas.Series(reference_seconds)
print("reference")
print(reference_seconds.describe())
print("candidate")
print(candidate_seconds.describe())
print("t-test")
print(s_stats.ttest_ind(candidate_seconds, reference_seconds, equal_var=False))
b_plotting.output_file(plot_path)
figures = []
_ = max(reference_seconds.max(), candidate_seconds.max())
figure = b_plotting.Figure(y_axis_label="seconds", y_range=(_ * -0.05, _ * 1.05))
figure.scatter(
reference_seconds.index,
reference_seconds.values,
color="black",
legend_label="reference",
)
figure.scatter(
candidate_seconds.index, candidate_seconds.values, legend_label="candidate"
)
figure.legend.location = "top_right"
figure.legend.click_policy = "hide"
figures.append(figure)
figure = b_plotting.Figure(x_axis_label="seconds", y_axis_label="frequency")
kde = s_stats.gaussian_kde(reference_seconds.values)
x = numpy.linspace(reference_seconds.min(), reference_seconds.max(), 1001)
figure.line(x, kde(x), color="black", legend_label="reference")
kde = s_stats.gaussian_kde(candidate_seconds.values)
x = numpy.linspace(candidate_seconds.min(), candidate_seconds.max(), 1001)
figure.line(x, kde(x), legend_label="candidate")
figure.legend.location = "top_right"
figure.legend.click_policy = "hide"
figures.append(figure)
b_plotting.save(b_layouts.column(figures)) Its essentially a trivial function call inside a tight loop. Server and client run as separate processes communicating over a unix socket. Current rpyc produces consistent times for depth = 0 (no sub request). The new commit however does reasonably well at times and terrible at others. With CPU frequency scaling set to Performance (Schedutil by default) the times become consistent and settle somewhere around 1.5x (slower). I did some profiling with viztracer and saw rpyc polling or waiting for the event 90% of the time. I'd therefore argue that most time is lost due to thread switching which is inherent to the design. If you know how to better pin down the cost of the design, please let me know! edit: just pushed some changes to https://github.com/notEvil/rpyc/tree/thread_bind which replace the deterministic handover with a controlled race. This way, the currently active thread has a chance to stay active and it usually does so for many iterations. Still, thread switching occurs and hampers performance significantly |
Task switching impacting performance does not surprise me. @notEvil tysvm for the effort and time. Here is where my head is at....
Here was my readings of the day:
Way forward:
I would be open to @notEvil's threading changes if they were configurable or allowed users to opt-in. Penalizing all projects for the sake of some projects is not very flexible. I'm open to feedback @notEvil @Jongy @tomerfiliba @coldfix et. al |
assumes that you can prevent netrefs from crossing thread boundaries. If you can, sure, but then again transparency is the first item on the feature list.
not necessarily, in my experience, the cost of locking is insignificant in a single threaded environment. I just added two lines to my code which stop the spawned threads as soon as they served their initial purpose, and got performance on par with current rpyc on my benchmark. (did surprise me tbh)
Please consider support for Trio in addition to asyncio, I think it deserves it. |
Hi,
following the issue template: I expect the example below to work ^^
Problematic is the assignment process of incoming requests to local threads.
To make it work rpyc obviously needs to spawn client side threads to "mirror" server side threads if necessary. What about
This scheme has a positive side-effect: threads don't interfere with each other anymore (no race conditions, no stalling due to shared responsibilities). Downsides: more bytes/request and some use cases may create many idle threads (yet one thread sync request on many connections is fine). Async is covered by not entering the pool in the first place.
Environment
Minimal example
Server:
Client:
The text was updated successfully, but these errors were encountered: