Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement built-in threading #2597

Merged
merged 26 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/user_guide/Deploy_and_Export.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@
" no index is served.\n",
" --num-procs N Number of worker processes for an app. Using 0 will\n",
" autodetect number of cores (defaults to 1)\n",
" --num-threads N Number of threads to launch in a ThreadPoolExecutor which\n",
" Panel will dispatch events to for concurrent execution on\n",
" separate cores (defaults to None).\n",
" --warm Whether to execute scripts on startup to warm up the server.\n",
" --autoreload\n",
" Whether to automatically reload user sessions when the application or any of its imports change.\n",
Expand Down
1 change: 1 addition & 0 deletions examples/user_guide/Overview.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
"> - `js_files`: External JS files to load. Dictionary should map from exported name to the URL of the JS file.\n",
"> - `loading_spinner`: The style of the global loading indicator, e.g. 'arcs', 'bars', 'dots', 'petals'.\n",
"> - `loading_color`: The color of the global loading indicator as a hex color, e.g. #6a6a6a\n",
"> - `nthreads`: If set will start a `ThreadPoolExecutor` to dispatch events to for concurrent execution on separate cores. By default no thread pool is launched, while setting nthreads=0 launches `min(32, os.cpu_count() + 4)` threads.\n",
"> - `raw_css`: List of raw CSS strings to add to load.\n",
"> - `safe_embed`: Whether to record all set events when embedding rather than just those that are changed\n",
"> - `session_history`: If set to a non-zero value this determines the maximum length of the pn.state.session_info dictionary, which tracks information about user sessions. A value of -1 indicates an unlimited history.\n",
Expand Down
142 changes: 142 additions & 0 deletions examples/user_guide/Performance_and_Debugging.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,148 @@
"The first time the app is loaded the data will be cached and subsequent sessions will simply look up the data in the cache, speeding up the process of rendering. If you want to warm up the cache before the first user visits the application you can also provide the `--warm` argument to the `panel serve` command, which will ensure the application is initialized once on launch."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Concurrent processing\n",
"\n",
"When deploying a Panel application to be accessed by multiple users they will often access the same server simultaneously. To maintain responsiveness of the application when multiple users are interacting with it at the same time there are multiple approaches to concurrency, each with their own drawbacks and advantages:\n",
"\n",
"1. `Load balancing`: A load balancer launches multiple instances of the Panel application and distributes network traffic between them. This is ensures that users the load is distributed across multiple servers but also requires a lot configuration and resources.\n",
"2. `Multiple processes`: Launches multiple processes on a single machine, effectively load balancing across the processes. Much simpler to set up than a load balancer but you are limited by the compute and memory resources on one machine.\n",
"2. `Threading`: Attempts to distribute processing across multiple threads. Effectiveness depends on the operations being performed, I/O bound and CPU bound operations that release the GIL can easily be made concurrent in this way. \n",
"3. `AsyncIO`: Allows asynchronously processing I/O bound operations. Effective for many concurrent I/O operations but requires rewriting your application and callbacks to make use of `async`/`await` paradigm.\n",
"\n",
"### Scaling across processes\n",
"\n",
"Both load balancing and starting multiple processes effectively spin up multiple copies of the same application and distribute the load across the processes. This results in duplication and therefore significantly higher overhead (basically scaling linearly with the number of processes). In applications where you are relying on global state (e.g. the `pn.state.cache`) this can introduce significant challenges to ensure that application state stays synchronized.\n",
"\n",
"#### Load balancing\n",
"\n",
"Setting up load balancing is a huge topic dependent on the precise system you are using so we won't go into any specific implementation here. In most cases you set up a reverse proxy (like NGINX) to distribute the load across multiple application servers. If you are using a system like Kubernetes it will also handle spinning up the servers for you and can even do so dynamically depending on the amount of concurrent users to ensure that you are not wasting resources when there are fewer users.\n",
"\n",
"<figure>\n",
"<img src=\"https://www.nginx.com/wp-content/uploads/2014/07/what-is-load-balancing-diagram-NGINX-1024x518.png\" width=\"768\"></img>\n",
"<figcaption>Diagram showing concept of load balacing (NGINX)</figcaption>\n",
"</figure>\n",
"\n",
"Load balancing is the most complex approach to set up but is guaranteed to improve concurrent usage of your application since different users are not contending for access to the same process or even necessarily the same physical compute and memory resources. At the same time it is more wasteful of resources since it potentially occupies multiple machines and since each process is isolated there is no sharing of cached data or global state. \n",
"\n",
"#### Multiple processes\n",
"\n",
"Launching a Panel application on multiple processes is a effectively a simpler version of load balancing with many of the same advantages and drawbacks. One major advantage is that it is easy to set up, when deploying your application with `panel serve` simply configure `--num-procs N`, where N is the number of processes. Generally choose an `N` that is no larger than the number of processors on your machine. This still uses significantly more resources since each process has the same overhead and all processes will be contending for the same memory and compute resources. However if your application is single-threaded and you have sufficient memory this is a simple way to make your application scale.\n",
"\n",
"### Scaling within a single process\n",
"\n",
"Threading and async are both approaches to speed up processing in Python using concurrency in a single Python process. Since we can't provide a complete primer on either threading or asynchronous processing here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n",
"\n",
"When to use which approach cannot be answered easily and is never completely clear cut. As a general guide however use `asyncio` can scale almost arbitrarily allowing you to perform thousands or even millions of IO bound operations concurrently, while threading limits you to the number of available threads. In practice this may never actually become relevant so the other main differences are that `async` coroutines are significantly more lightweight but that you have to carefully consider accessing shared objects across threads. Using `async` coroutines makes it very clear where concurrency occurs and therefore can make it easier to avoid race conditions and avoid having to think about locking a thread to access shared objects. However, in some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/python-gil/), this includes many NumPy, Pandas and Numba functions.\n",
"\n",
"### Threading\n",
"\n",
"Using threading in Panel can either be enabled manually, e.g. by managing your own thread pool and dispatching concurrent tasks to it, or it can be managed by Panel itself by setting the `config.nthreads` parameter (or equivalently by setting it with `pn.extension(nthreads=...)`. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to `0` it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). \n",
"\n",
"Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. An event in this case refers to any action generated on the frontend such as the manipulation of a widget by a user or the interaction with a plot. If you are launching an application with `panel serve` you should enable this option configure this option on the CLI by setting `--num-threads`.\n",
"\n",
"To demonstrate the effect of enabling threading take this example below:\n",
"\n",
"```python\n",
"import panel as pn\n",
"\n",
"pn.extension(nthreads=2)\n",
"\n",
"def button_click(event):\n",
" print('Button clicked for the {event.new}th time.')\n",
" time.sleep(2) # Simulate long running operation\n",
" print('Finished processing {event.new}th click.')\n",
" \n",
"button = pn.widgets.Button(name='Click me!')\n",
"\n",
"button.on_click(button_click)\n",
"```\n",
"\n",
"When we click the button twice successively in a single-threaded context we will see the following output:\n",
"\n",
"```\n",
"> Button clicked for the 1th time.\n",
"... 2 second wait\n",
"> Finished processing 1th click.\n",
"> Button clicked for the 2th time.\n",
"... 2 second wait\n",
"> Finished processing 2th click.\n",
"```\n",
"\n",
"In a threaded context on the other hand the two clicks will be processed concurrently:\n",
"\n",
"```\n",
"> Button clicked for the 1th time.\n",
"> Button clicked for the 2th time.\n",
"... 2 second wait\n",
"> Finished processing 1th click.\n",
"> Finished processing 2th click.\n",
"```\n",
"\n",
"### AsyncIO\n",
"\n",
"When using Python>=3.8 you can use async callbacks wherever you would ordinarily use a regular synchronous function. For instance you can use `pn.bind` on an async function:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import aiohttp\n",
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"widget = pn.widgets.IntSlider(start=0, end=10)\n",
"\n",
"async def get_img(index):\n",
" async with aiohttp.ClientSession() as session:\n",
" async with session.get(f\"https://picsum.photos/800/300?image={index}\") as resp:\n",
" return pn.pane.JPG(await resp.read())\n",
" \n",
"pn.Column(widget, pn.bind(get_img, widget))"
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
]
},
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example Panel will invoke the function and update the output when the function returns while leaving the process unblocked for the duration of the `aiohttp` request. \n",
"\n",
"Similarly you can attach asynchronous callbacks using `.param.watch`:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"widget = pn.widgets.IntSlider(start=0, end=10)\n",
"\n",
"image = pn.pane.JPG()\n",
"\n",
"async def update_img(event):\n",
" async with aiohttp.ClientSession() as session:\n",
" async with session.get(f\"https://picsum.photos/800/300?image={event.new}\") as resp:\n",
" image.object = await resp.read()\n",
" \n",
"widget.param.watch(update_img, 'value')\n",
"widget.param.trigger('value')\n",
" \n",
"pn.Column(widget, image)"
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
]
},
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example Param will await the asynchronous function and the image will be updated when the request completes."
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
9 changes: 9 additions & 0 deletions panel/command/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ class Serve(_BkServe):
('--autoreload', dict(
action = 'store_true',
help = "Whether to autoreload source when script changes."
)),
('--num-threads', dict(
action = 'store',
type = int,
help = "Whether to start a thread pool which events are dispatched to.",
default = None
))
)

Expand Down Expand Up @@ -285,6 +291,9 @@ def customize_kwargs(self, args, server_kwargs):
patterns.extend(pattern)
state.publish('session_info', state, ['session_info'])

if args.num_threads is not None:
config.nthreads = args.num_threads

if args.oauth_provider:
config.oauth_provider = args.oauth_provider
config.oauth_expiry = args.oauth_expiry_days
Expand Down
19 changes: 19 additions & 0 deletions panel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import sys

from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from weakref import WeakKeyDictionary

Expand Down Expand Up @@ -125,6 +126,11 @@ class _config(_base_config):
theme = param.ObjectSelector(default='default', objects=['default', 'dark'], doc="""
The theme to apply to the selected global template.""")

nthreads = param.Integer(default=None, doc="""
When set to a non-None value a thread pool will be started.
Whenever an event arrives from the frontend it will be
dispatched to the thread pool to be processed.""")

throttled = param.Boolean(default=False, doc="""
If sliders and inputs should be throttled until release of mouse.""")

Expand Down Expand Up @@ -205,6 +211,19 @@ def __init__(self, **params):
if self.log_level:
panel_log_handler.setLevel(self.log_level)

@param.depends('nthreads', watch=True)
def _set_thread_pool(self):
from .io.state import state
if self.nthreads is None:
if state._thread_pool is not None:
state._thread_pool.shutdown(wait=False)
state._thread_pool = None
return
if state._thread_pool:
raise RuntimeError("Thread pool already running")
threads = self.nthreads if self.nthreads else None
state._thread_pool = ThreadPoolExecutor(max_workers=threads)

@contextmanager
def set(self, **kwargs):
values = [(k, v) for k, v in self.param.values().items() if k != 'name']
Expand Down
53 changes: 37 additions & 16 deletions panel/io/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,25 @@ def _update_period(self):
self.stop()
self.start()

async def _periodic_callback(self):
with edit_readonly(state):
state.busy = True
def _exec_callback(self, post=False):
from .server import set_curdoc
try:
with set_curdoc(self._doc):
cb = self.callback()
except Exception:
cb = None
if post:
self._post_callback()
return cb

def _post_callback(self):
cbname = function_name(self.callback)
if self._doc:
_periodic_logger.info(
LOG_PERIODIC_START, id(self._doc), cbname, self._counter
LOG_PERIODIC_END, id(self._doc), cbname, self._counter
)
try:
if inspect.isasyncgenfunction(self.callback) or inspect.iscoroutinefunction(self.callback):
await self.callback()
else:
self.callback()
finally:
if self._doc:
_periodic_logger.info(
LOG_PERIODIC_END, id(self._doc), cbname, self._counter
)
with edit_readonly(state):
state.busy = False
with edit_readonly(state):
state.busy = False
self._counter += 1
if self.timeout is not None:
dt = (time.time() - self._start_time) * 1000
Expand All @@ -101,6 +100,28 @@ async def _periodic_callback(self):
if self._counter == self.count:
self.stop()

async def _periodic_callback(self):
with edit_readonly(state):
state.busy = True
cbname = function_name(self.callback)
if self._doc:
_periodic_logger.info(
LOG_PERIODIC_START, id(self._doc), cbname, self._counter
)
is_async = (
inspect.isasyncgenfunction(self.callback) or
inspect.iscoroutinefunction(self.callback)
)
if state._thread_pool and not is_async:
state._thread_pool.submit(self._exec_callback, True)
return
try:
cb = self._exec_callback()
if inspect.isawaitable(cb):
await cb
finally:
self._post_callback()

@property
def counter(self):
"""
Expand Down
13 changes: 11 additions & 2 deletions panel/io/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ def init_doc(doc):
if not doc.session_context:
return doc

thread = threading.current_thread()
if thread:
with set_curdoc(doc):
state._thread_id = thread.ident

session_id = doc.session_context.id
sessions = state.session_info['sessions']
if session_id not in sessions:
Expand All @@ -414,9 +419,10 @@ def init_doc(doc):

@contextmanager
def set_curdoc(doc):
orig_doc = state._curdoc
state.curdoc = doc
yield
state.curdoc = None
state.curdoc = orig_doc

def with_lock(func):
"""
Expand Down Expand Up @@ -798,7 +804,10 @@ def run(self):
bokeh_server = target(*args, **kwargs)
finally:
if isinstance(bokeh_server, Server):
bokeh_server.stop()
try:
bokeh_server.stop()
except Exception:
pass
if hasattr(self, '_target'):
del self._target, self._args, self._kwargs
else:
Expand Down
17 changes: 16 additions & 1 deletion panel/io/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class _state(param.Parameterized):
_hold = False

# Used to ensure that events are not scheduled from the wrong thread
_thread_id = None
_thread_id_ = WeakKeyDictionary()
_thread_pool = None

_comm_manager = _CommManager

Expand Down Expand Up @@ -119,6 +120,14 @@ def __repr__(self):
return "state(servers=[])"
return "state(servers=[\n {}\n])".format(",\n ".join(server_info))

@property
def _thread_id(self):
return self._thread_id_.get(self.curdoc) if self.curdoc else None

@_thread_id.setter
def _thread_id(self, thread_id):
self._thread_id_[self.curdoc] = thread_id

def _unblocked(self, doc):
thread = threading.current_thread()
thread_id = thread.ident if thread else None
Expand Down Expand Up @@ -252,6 +261,12 @@ def add_periodic_callback(self, callback, period=500, count=None,
callback=callback, period=period, count=count, timeout=timeout
)
if start:
if self._thread_id is not None:
thread = threading.current_thread()
thread_id = thread.ident if thread else None
if self._thread_id != thread_id:
self.curdoc.add_next_tick_callback(cb.start)
return cb
cb.start()
return cb

Expand Down
7 changes: 6 additions & 1 deletion panel/pane/vega.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys

from functools import partial

import param
import numpy as np

Expand Down Expand Up @@ -226,7 +228,10 @@ def _get_model(self, doc, root=None, parent=None, comm=None):
data=json, data_sources=sources, events=self._selections,
throttle=self._throttle, **props
)
model.on_event('vega_event', self._process_event)
if comm:
model.on_event('vega_event', self._comm_event)
else:
model.on_event('vega_event', partial(self._server_event, doc))
if root is None:
root = model
self._models[root.ref['id']] = (model, parent)
Expand Down
Loading