diff --git a/examples/user_guide/Deploy_and_Export.ipynb b/examples/user_guide/Deploy_and_Export.ipynb index f57df4f169..994d685d6b 100644 --- a/examples/user_guide/Deploy_and_Export.ipynb +++ b/examples/user_guide/Deploy_and_Export.ipynb @@ -382,6 +382,9 @@ " no index is served.\n", " --num-procs N Number of worker processes for an app. Using 0 will\n", " autodetect number of cores (defaults to 1)\n", + " --num-threads N Number of threads to launch in a ThreadPoolExecutor which\n", + " Panel will dispatch events to for concurrent execution on\n", + " separate cores (defaults to None).\n", " --warm Whether to execute scripts on startup to warm up the server.\n", " --autoreload\n", " Whether to automatically reload user sessions when the application or any of its imports change.\n", diff --git a/examples/user_guide/Overview.ipynb b/examples/user_guide/Overview.ipynb index 53b3f514de..1312dd9dec 100644 --- a/examples/user_guide/Overview.ipynb +++ b/examples/user_guide/Overview.ipynb @@ -170,6 +170,7 @@ "> - `js_files`: External JS files to load. Dictionary should map from exported name to the URL of the JS file.\n", "> - `loading_spinner`: The style of the global loading indicator, e.g. 'arcs', 'bars', 'dots', 'petals'.\n", "> - `loading_color`: The color of the global loading indicator as a hex color, e.g. #6a6a6a\n", + "> - `nthreads`: If set will start a `ThreadPoolExecutor` to dispatch events to for concurrent execution on separate cores. By default no thread pool is launched, while setting nthreads=0 launches `min(32, os.cpu_count() + 4)` threads.\n", "> - `raw_css`: List of raw CSS strings to add to load.\n", "> - `safe_embed`: Whether to record all set events when embedding rather than just those that are changed\n", "> - `session_history`: If set to a non-zero value this determines the maximum length of the pn.state.session_info dictionary, which tracks information about user sessions. A value of -1 indicates an unlimited history.\n", diff --git a/examples/user_guide/Performance_and_Debugging.ipynb b/examples/user_guide/Performance_and_Debugging.ipynb index 6d88d4c330..05d6ea9128 100644 --- a/examples/user_guide/Performance_and_Debugging.ipynb +++ b/examples/user_guide/Performance_and_Debugging.ipynb @@ -46,6 +46,148 @@ "The first time the app is loaded the data will be cached and subsequent sessions will simply look up the data in the cache, speeding up the process of rendering. If you want to warm up the cache before the first user visits the application you can also provide the `--warm` argument to the `panel serve` command, which will ensure the application is initialized once on launch." ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Concurrent processing\n", + "\n", + "When deploying a Panel application to be accessed by multiple users they will often access the same server simultaneously. To maintain responsiveness of the application when multiple users are interacting with it at the same time there are multiple approaches to concurrency, each with their own drawbacks and advantages:\n", + "\n", + "1. `Load balancing`: A load balancer launches multiple instances of the Panel application and distributes network traffic between them. This is ensures that users the load is distributed across multiple servers but also requires a lot configuration and resources.\n", + "2. `Multiple processes`: Launches multiple processes on a single machine, effectively load balancing across the processes. Much simpler to set up than a load balancer but you are limited by the compute and memory resources on one machine.\n", + "2. `Threading`: Attempts to distribute processing across multiple threads. Effectiveness depends on the operations being performed, I/O bound and CPU bound operations that release the GIL can easily be made concurrent in this way. \n", + "3. `AsyncIO`: Allows asynchronously processing I/O bound operations. Effective for many concurrent I/O operations but requires rewriting your application and callbacks to make use of `async`/`await` paradigm.\n", + "\n", + "### Scaling across processes\n", + "\n", + "Both load balancing and starting multiple processes effectively spin up multiple copies of the same application and distribute the load across the processes. This results in duplication and therefore significantly higher overhead (basically scaling linearly with the number of processes). In applications where you are relying on global state (e.g. the `pn.state.cache`) this can introduce significant challenges to ensure that application state stays synchronized.\n", + "\n", + "#### Load balancing\n", + "\n", + "Setting up load balancing is a huge topic dependent on the precise system you are using so we won't go into any specific implementation here. In most cases you set up a reverse proxy (like NGINX) to distribute the load across multiple application servers. If you are using a system like Kubernetes it will also handle spinning up the servers for you and can even do so dynamically depending on the amount of concurrent users to ensure that you are not wasting resources when there are fewer users.\n", + "\n", + "
\n", + "\n", + "
Diagram showing concept of load balacing (NGINX)
\n", + "
\n", + "\n", + "Load balancing is the most complex approach to set up but is guaranteed to improve concurrent usage of your application since different users are not contending for access to the same process or even necessarily the same physical compute and memory resources. At the same time it is more wasteful of resources since it potentially occupies multiple machines and since each process is isolated there is no sharing of cached data or global state. \n", + "\n", + "#### Multiple processes\n", + "\n", + "Launching a Panel application on multiple processes is a effectively a simpler version of load balancing with many of the same advantages and drawbacks. One major advantage is that it is easy to set up, when deploying your application with `panel serve` simply configure `--num-procs N`, where N is the number of processes. Generally choose an `N` that is no larger than the number of processors on your machine. This still uses significantly more resources since each process has the same overhead and all processes will be contending for the same memory and compute resources. However if your application is single-threaded and you have sufficient memory this is a simple way to make your application scale.\n", + "\n", + "### Scaling within a single process\n", + "\n", + "Threading and async are both approaches to speed up processing in Python using concurrency in a single Python process. Since we can't provide a complete primer on either threading or asynchronous processing here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", + "\n", + "When to use which approach cannot be answered easily and is never completely clear cut. As a general guide however use `asyncio` can scale almost arbitrarily allowing you to perform thousands or even millions of IO bound operations concurrently, while threading limits you to the number of available threads. In practice this may never actually become relevant so the other main differences are that `async` coroutines are significantly more lightweight but that you have to carefully consider accessing shared objects across threads. Using `async` coroutines makes it very clear where concurrency occurs and therefore can make it easier to avoid race conditions and avoid having to think about locking a thread to access shared objects. However, in some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/python-gil/), this includes many NumPy, Pandas and Numba functions.\n", + "\n", + "### Threading\n", + "\n", + "Using threading in Panel can either be enabled manually, e.g. by managing your own thread pool and dispatching concurrent tasks to it, or it can be managed by Panel itself by setting the `config.nthreads` parameter (or equivalently by setting it with `pn.extension(nthreads=...)`. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to `0` it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). \n", + "\n", + "Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. An event in this case refers to any action generated on the frontend such as the manipulation of a widget by a user or the interaction with a plot. If you are launching an application with `panel serve` you should enable this option configure this option on the CLI by setting `--num-threads`.\n", + "\n", + "To demonstrate the effect of enabling threading take this example below:\n", + "\n", + "```python\n", + "import panel as pn\n", + "\n", + "pn.extension(nthreads=2)\n", + "\n", + "def button_click(event):\n", + " print('Button clicked for the {event.new}th time.')\n", + " time.sleep(2) # Simulate long running operation\n", + " print('Finished processing {event.new}th click.')\n", + " \n", + "button = pn.widgets.Button(name='Click me!')\n", + "\n", + "button.on_click(button_click)\n", + "```\n", + "\n", + "When we click the button twice successively in a single-threaded context we will see the following output:\n", + "\n", + "```\n", + "> Button clicked for the 1th time.\n", + "... 2 second wait\n", + "> Finished processing 1th click.\n", + "> Button clicked for the 2th time.\n", + "... 2 second wait\n", + "> Finished processing 2th click.\n", + "```\n", + "\n", + "In a threaded context on the other hand the two clicks will be processed concurrently:\n", + "\n", + "```\n", + "> Button clicked for the 1th time.\n", + "> Button clicked for the 2th time.\n", + "... 2 second wait\n", + "> Finished processing 1th click.\n", + "> Finished processing 2th click.\n", + "```\n", + "\n", + "### AsyncIO\n", + "\n", + "When using Python>=3.8 you can use async callbacks wherever you would ordinarily use a regular synchronous function. For instance you can use `pn.bind` on an async function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import aiohttp\n", + "\n", + "widget = pn.widgets.IntSlider(start=0, end=10)\n", + "\n", + "async def get_img(index):\n", + " async with aiohttp.ClientSession() as session:\n", + " async with session.get(f\"https://picsum.photos/800/300?image={index}\") as resp:\n", + " return pn.pane.JPG(await resp.read())\n", + " \n", + "pn.Column(widget, pn.bind(get_img, widget))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this example Panel will invoke the function and update the output when the function returns while leaving the process unblocked for the duration of the `aiohttp` request. \n", + "\n", + "Similarly you can attach asynchronous callbacks using `.param.watch`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "widget = pn.widgets.IntSlider(start=0, end=10)\n", + "\n", + "image = pn.pane.JPG()\n", + "\n", + "async def update_img(event):\n", + " async with aiohttp.ClientSession() as session:\n", + " async with session.get(f\"https://picsum.photos/800/300?image={event.new}\") as resp:\n", + " image.object = await resp.read()\n", + " \n", + "widget.param.watch(update_img, 'value')\n", + "widget.param.trigger('value')\n", + " \n", + "pn.Column(widget, image)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this example Param will await the asynchronous function and the image will be updated when the request completes." + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/panel/command/serve.py b/panel/command/serve.py index 08b48548f5..d1c4367699 100644 --- a/panel/command/serve.py +++ b/panel/command/serve.py @@ -174,6 +174,12 @@ class Serve(_BkServe): ('--autoreload', dict( action = 'store_true', help = "Whether to autoreload source when script changes." + )), + ('--num-threads', dict( + action = 'store', + type = int, + help = "Whether to start a thread pool which events are dispatched to.", + default = None )) ) @@ -285,6 +291,9 @@ def customize_kwargs(self, args, server_kwargs): patterns.extend(pattern) state.publish('session_info', state, ['session_info']) + if args.num_threads is not None: + config.nthreads = args.num_threads + if args.oauth_provider: config.oauth_provider = args.oauth_provider config.oauth_expiry = args.oauth_expiry_days diff --git a/panel/config.py b/panel/config.py index d1d1dfb0f3..20dd222597 100644 --- a/panel/config.py +++ b/panel/config.py @@ -9,6 +9,7 @@ import os import sys +from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from weakref import WeakKeyDictionary @@ -125,6 +126,11 @@ class _config(_base_config): theme = param.ObjectSelector(default='default', objects=['default', 'dark'], doc=""" The theme to apply to the selected global template.""") + nthreads = param.Integer(default=None, doc=""" + When set to a non-None value a thread pool will be started. + Whenever an event arrives from the frontend it will be + dispatched to the thread pool to be processed.""") + throttled = param.Boolean(default=False, doc=""" If sliders and inputs should be throttled until release of mouse.""") @@ -205,6 +211,19 @@ def __init__(self, **params): if self.log_level: panel_log_handler.setLevel(self.log_level) + @param.depends('nthreads', watch=True) + def _set_thread_pool(self): + from .io.state import state + if self.nthreads is None: + if state._thread_pool is not None: + state._thread_pool.shutdown(wait=False) + state._thread_pool = None + return + if state._thread_pool: + raise RuntimeError("Thread pool already running") + threads = self.nthreads if self.nthreads else None + state._thread_pool = ThreadPoolExecutor(max_workers=threads) + @contextmanager def set(self, **kwargs): values = [(k, v) for k, v in self.param.values().items() if k != 'name'] diff --git a/panel/io/callbacks.py b/panel/io/callbacks.py index 66b32b3d04..4e22481f56 100644 --- a/panel/io/callbacks.py +++ b/panel/io/callbacks.py @@ -73,26 +73,25 @@ def _update_period(self): self.stop() self.start() - async def _periodic_callback(self): - with edit_readonly(state): - state.busy = True + def _exec_callback(self, post=False): + from .server import set_curdoc + try: + with set_curdoc(self._doc): + cb = self.callback() + except Exception: + cb = None + if post: + self._post_callback() + return cb + + def _post_callback(self): cbname = function_name(self.callback) if self._doc: _periodic_logger.info( - LOG_PERIODIC_START, id(self._doc), cbname, self._counter + LOG_PERIODIC_END, id(self._doc), cbname, self._counter ) - try: - if inspect.isasyncgenfunction(self.callback) or inspect.iscoroutinefunction(self.callback): - await self.callback() - else: - self.callback() - finally: - if self._doc: - _periodic_logger.info( - LOG_PERIODIC_END, id(self._doc), cbname, self._counter - ) - with edit_readonly(state): - state.busy = False + with edit_readonly(state): + state.busy = False self._counter += 1 if self.timeout is not None: dt = (time.time() - self._start_time) * 1000 @@ -101,6 +100,28 @@ async def _periodic_callback(self): if self._counter == self.count: self.stop() + async def _periodic_callback(self): + with edit_readonly(state): + state.busy = True + cbname = function_name(self.callback) + if self._doc: + _periodic_logger.info( + LOG_PERIODIC_START, id(self._doc), cbname, self._counter + ) + is_async = ( + inspect.isasyncgenfunction(self.callback) or + inspect.iscoroutinefunction(self.callback) + ) + if state._thread_pool and not is_async: + state._thread_pool.submit(self._exec_callback, True) + return + try: + cb = self._exec_callback() + if inspect.isawaitable(cb): + await cb + finally: + self._post_callback() + @property def counter(self): """ diff --git a/panel/io/server.py b/panel/io/server.py index b0dd74784e..80d9511be8 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -401,6 +401,11 @@ def init_doc(doc): if not doc.session_context: return doc + thread = threading.current_thread() + if thread: + with set_curdoc(doc): + state._thread_id = thread.ident + session_id = doc.session_context.id sessions = state.session_info['sessions'] if session_id not in sessions: @@ -414,9 +419,10 @@ def init_doc(doc): @contextmanager def set_curdoc(doc): + orig_doc = state._curdoc state.curdoc = doc yield - state.curdoc = None + state.curdoc = orig_doc def with_lock(func): """ @@ -798,7 +804,10 @@ def run(self): bokeh_server = target(*args, **kwargs) finally: if isinstance(bokeh_server, Server): - bokeh_server.stop() + try: + bokeh_server.stop() + except Exception: + pass if hasattr(self, '_target'): del self._target, self._args, self._kwargs else: diff --git a/panel/io/state.py b/panel/io/state.py index 2f4a3c1c0b..8873866945 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -67,7 +67,8 @@ class _state(param.Parameterized): _hold = False # Used to ensure that events are not scheduled from the wrong thread - _thread_id = None + _thread_id_ = WeakKeyDictionary() + _thread_pool = None _comm_manager = _CommManager @@ -119,6 +120,14 @@ def __repr__(self): return "state(servers=[])" return "state(servers=[\n {}\n])".format(",\n ".join(server_info)) + @property + def _thread_id(self): + return self._thread_id_.get(self.curdoc) if self.curdoc else None + + @_thread_id.setter + def _thread_id(self, thread_id): + self._thread_id_[self.curdoc] = thread_id + def _unblocked(self, doc): thread = threading.current_thread() thread_id = thread.ident if thread else None @@ -252,6 +261,12 @@ def add_periodic_callback(self, callback, period=500, count=None, callback=callback, period=period, count=count, timeout=timeout ) if start: + if self._thread_id is not None: + thread = threading.current_thread() + thread_id = thread.ident if thread else None + if self._thread_id != thread_id: + self.curdoc.add_next_tick_callback(cb.start) + return cb cb.start() return cb diff --git a/panel/pane/vega.py b/panel/pane/vega.py index 6e7151ea61..fa3d3d7396 100644 --- a/panel/pane/vega.py +++ b/panel/pane/vega.py @@ -1,5 +1,7 @@ import sys +from functools import partial + import param import numpy as np @@ -226,7 +228,10 @@ def _get_model(self, doc, root=None, parent=None, comm=None): data=json, data_sources=sources, events=self._selections, throttle=self._throttle, **props ) - model.on_event('vega_event', self._process_event) + if comm: + model.on_event('vega_event', self._comm_event) + else: + model.on_event('vega_event', partial(self._server_event, doc)) if root is None: root = model self._models[root.ref['id']] = (model, parent) diff --git a/panel/reactive.py b/panel/reactive.py index 3f8a7ca147..19f69e53a3 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -9,7 +9,6 @@ import re import sys import textwrap -import threading from collections import Counter, defaultdict, namedtuple from functools import partial @@ -278,20 +277,30 @@ def _process_events(self, events): @gen.coroutine def _change_coroutine(self, doc=None): - self._change_event(doc) + if state._thread_pool: + state._thread_pool.submit(self._change_event, doc) + else: + self._change_event(doc) + + @gen.coroutine + def _event_coroutine(self, event): + if state._thread_pool: + state._thread_pool.submit(self._process_event, event) + else: + self._process_event(event) def _change_event(self, doc=None): try: state.curdoc = doc - thread = threading.current_thread() - thread_id = thread.ident if thread else None - state._thread_id = thread_id events = self._events self._events = {} self._process_events(events) finally: state.curdoc = None - state._thread_id = None + + def _schedule_change(self, doc, comm): + with hold(doc, comm=comm): + self._change_event(doc) def _comm_change(self, doc, ref, comm, subpath, attr, old, new): if subpath: @@ -300,15 +309,23 @@ def _comm_change(self, doc, ref, comm, subpath, attr, old, new): self._changing[ref].remove(attr) return - with hold(doc, comm=comm): - self._process_events({attr: new}) + self._events.update({attr: new}) + if state._thread_pool: + state._thread_pool.submit(self._schedule_change, doc, comm) + else: + self._schedule_change(doc, comm) + + def _comm_event(self, event): + if state._thread_pool: + state._thread_pool.submit(self._process_event, event) + else: + self._process_event(event) def _server_event(self, doc, event): state._locks.clear() if doc.session_context: - doc.add_timeout_callback( - partial(self._process_event, event), - self._debounce + doc.add_next_tick_callback( + partial(self._event_coroutine, event) ) else: self._process_event(event) @@ -323,14 +340,16 @@ def _server_change(self, doc, ref, subpath, attr, old, new): state._locks.clear() processing = bool(self._events) self._events.update({attr: new}) - if not processing: - if doc.session_context: - doc.add_timeout_callback( - partial(self._change_coroutine, doc), - self._debounce - ) - else: - self._change_event(doc) + if processing: + return + + if doc.session_context: + doc.add_timeout_callback( + partial(self._change_coroutine, doc), + self._debounce + ) + else: + self._change_event(doc) class Reactive(Syncable, Viewable): @@ -1510,7 +1529,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): model.children = self._get_children(doc, root, model, comm) if comm: - model.on_event('dom_event', self._process_event) + model.on_event('dom_event', self._comm_event) else: model.on_event('dom_event', partial(self._server_event, doc)) diff --git a/panel/tests/conftest.py b/panel/tests/conftest.py index b4b4ffdea3..672e34d174 100644 --- a/panel/tests/conftest.py +++ b/panel/tests/conftest.py @@ -14,6 +14,7 @@ from bokeh.client import pull_session from pyviz_comms import Comm +from panel import config from panel.pane import HTML, Markdown from panel.io import state from panel import serve @@ -88,7 +89,7 @@ def tmpdir(request, tmpdir_factory): @pytest.fixture() def html_server_session(): html = HTML('

Title

') - server = serve(html, port=5006, show=False, start=False) + server = serve(html, port=6000, show=False, start=False) session = pull_session( session_id='Test', url="http://localhost:{:d}/".format(server.port), @@ -104,7 +105,7 @@ def html_server_session(): @pytest.fixture() def markdown_server_session(): html = Markdown('#Title') - server = serve(html, port=5007, show=False, start=False) + server = serve(html, port=6001, show=False, start=False) session = pull_session( session_id='Test', url="http://localhost:{:d}/".format(server.port), @@ -151,8 +152,10 @@ def create_sessions(slugs, titles): def with_curdoc(): old_curdoc = state.curdoc state.curdoc = Document() - yield - state.curdoc = old_curdoc + try: + yield + finally: + state.curdoc = old_curdoc @contextmanager @@ -171,14 +174,30 @@ def server_cleanup(): """ Clean up after test fails """ - yield - state.kill_all_servers() - state._indicators.clear() - state._locations.clear() - + try: + yield + finally: + state.kill_all_servers() + state._indicators.clear() + state._locations.clear() + state._curdoc = None + if state._thread_pool is not None: + state._thread_pool.shutdown(wait=False) + state._thread_pool = None @pytest.fixture def py_file(): tf = tempfile.NamedTemporaryFile(mode='w', suffix='.py') - yield tf - tf.close() + try: + yield tf + finally: + tf.close() + + +@pytest.fixture +def threads(): + config.nthreads = 4 + try: + yield 4 + finally: + config.nthreads = None diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index 26ceb861de..a3803ecdf0 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -9,18 +9,20 @@ from panel.config import config from panel.io import state +from panel.layout import Row from panel.models import HTML as BkHTML +from panel.models.tabulator import TableEditEvent from panel.pane import Markdown from panel.io.resources import DIST_DIR from panel.io.server import get_server, serve, set_curdoc from panel.template import BootstrapTemplate -from panel.widgets import Button +from panel.widgets import Button, Tabulator def test_get_server(html_server_session): html, server, session = html_server_session - assert server.port == 5006 + assert server.port == 6000 root = session.document.roots[0] assert isinstance(root, BkHTML) assert root.text == '<h1>Title</h1>' @@ -50,103 +52,85 @@ def test_server_static_dirs(): html = Markdown('# Title') static = {'tests': os.path.dirname(__file__)} - server = serve(html, port=6000, threaded=True, static_dirs=static, show=False) + serve(html, port=6000, threaded=True, static_dirs=static, show=False) # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6000/tests/test_server.py") - with open(__file__, encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6000/tests/test_server.py") + with open(__file__, encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources(): template = BootstrapTemplate() - server = serve({'template': template}, port=6001, threaded=True, show=False) + serve({'template': template}, port=6001, threaded=True, show=False) # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6001/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") - with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6001/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") + with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources_with_prefix(): template = BootstrapTemplate() - server = serve({'template': template}, port=6004, threaded=True, show=False, prefix='prefix') + serve({'template': template}, port=6004, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6004/prefix/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") - with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6004/prefix/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") + with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources_with_prefix_relative_url(): template = BootstrapTemplate() - server = serve({'template': template}, port=6005, threaded=True, show=False, prefix='prefix') + serve({'template': template}, port=6005, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6005/prefix/template") - content = r.content.decode('utf-8') - assert 'href="static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content - finally: - server.stop() + r = requests.get("http://localhost:6005/prefix/template") + content = r.content.decode('utf-8') + assert 'href="static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content def test_server_template_static_resources_with_subpath_and_prefix_relative_url(): template = BootstrapTemplate() - server = serve({'/subpath/template': template}, port=6005, threaded=True, show=False, prefix='prefix') + serve({'/subpath/template': template}, port=6005, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6005/prefix/subpath/template") - content = r.content.decode('utf-8') - assert 'href="../static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content - finally: - server.stop() + r = requests.get("http://localhost:6005/prefix/subpath/template") + content = r.content.decode('utf-8') + assert 'href="../static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content def test_server_extensions_on_root(): html = Markdown('# Title') - server = serve(html, port=6006, threaded=True, show=False) + serve(html, port=6006, threaded=True, show=False) # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6006/static/extensions/panel/css/loading.css") - assert r.ok - finally: - server.stop() + r = requests.get("http://localhost:6006/static/extensions/panel/css/loading.css") + assert r.ok def test_autoload_js(): html = Markdown('# Title') port = 6007 app_name = 'test' - server = serve({app_name: html}, port=port, threaded=True, show=False) + serve({app_name: html}, port=port, threaded=True, show=False) # Wait for server to start time.sleep(0.5) @@ -154,11 +138,8 @@ def test_autoload_js(): args = f"bokeh-autoload-element=1002&bokeh-app-path=/{app_name}&bokeh-absolute-url=http://localhost:{port}/{app_name}" r = requests.get(f"http://localhost:{port}/{app_name}/autoload.js?{args}") - try: - assert r.status_code == 200 - assert f"http://localhost:{port}/static/extensions/panel/css/alerts.css" in r.content.decode('utf-8') - finally: - server.stop() + assert r.status_code == 200 + assert f"http://localhost:{port}/static/extensions/panel/css/alerts.css" in r.content.decode('utf-8') def test_server_async_callbacks(): @@ -175,7 +156,7 @@ async def cb(event, count=[0]): button.on_click(cb) - server = serve(button, port=6002, threaded=True, show=False) + serve(button, port=6002, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -191,10 +172,7 @@ async def cb(event, count=[0]): time.sleep(2) # Ensure multiple callbacks started concurrently - try: - assert max(counts) > 1 - finally: - server.stop() + assert max(counts) > 1 def test_serve_config_per_session_state(): @@ -205,29 +183,25 @@ def app1(): def app2(): config.raw_css = [CSS2] - server1 = serve(app1, port=6004, threaded=True, show=False) - server2 = serve(app2, port=6005, threaded=True, show=False) + serve(app1, port=6004, threaded=True, show=False) + serve(app2, port=6005, threaded=True, show=False) r1 = requests.get("http://localhost:6004/").content.decode('utf-8') r2 = requests.get("http://localhost:6005/").content.decode('utf-8') - try: - assert CSS1 not in config.raw_css - assert CSS2 not in config.raw_css - assert CSS1 in r1 - assert CSS2 not in r1 - assert CSS1 not in r2 - assert CSS2 in r2 - finally: - server1.stop() - server2.stop() + assert CSS1 not in config.raw_css + assert CSS2 not in config.raw_css + assert CSS1 in r1 + assert CSS2 not in r1 + assert CSS1 not in r2 + assert CSS2 in r2 def test_server_session_info(): with config.set(session_history=-1): html = Markdown('# Title') - server = serve(html, port=6003, threaded=True, show=False) + serve(html, port=6003, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -249,7 +223,6 @@ def test_server_session_info(): state._init_session(None) assert state.session_info['live'] == 1 - server.stop() state.curdoc = None html._server_destroy(session_context) assert state.session_info['live'] == 0 @@ -257,8 +230,8 @@ def test_server_session_info(): def test_show_server_info(html_server_session, markdown_server_session): server_info = repr(state) - assert "localhost:5006 - HTML" in server_info - assert "localhost:5007 - Markdown" in server_info + assert "localhost:6000 - HTML" in server_info + assert "localhost:6001 - Markdown" in server_info def test_kill_all_servers(html_server_session, markdown_server_session): @@ -287,7 +260,114 @@ def test_serve_can_serve_panel_app_from_file(): server = get_server({"panel-app": path}) assert "/panel-app" in server._tornado.applications + def test_serve_can_serve_bokeh_app_from_file(): path = pathlib.Path(__file__).parent / "io"/"bk_app.py" server = get_server({"bk-app": path}) assert "/bk-app" in server._tornado.applications + + +def test_server_thread_pool_change_event(threads): + button = Button(name='Click') + button2 = Button(name='Click') + + counts = [] + + def cb(event, count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + button.on_click(cb) + button2.on_click(cb) + layout = Row(button, button2) + + port = 6010 + serve(layout, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + model = list(layout._models.values())[0][0] + doc = model.document + with set_curdoc(doc): + button._server_change(doc, model.ref['id'], None, 'clicks', 0, 1) + button2._server_change(doc, model.ref['id'], None, 'clicks', 0, 1) + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether Button on_click callback was executed concurrently + assert max(counts) == 2 + + +def test_server_thread_pool_bokeh_event(threads): + import pandas as pd + + df = pd.DataFrame([[1, 1], [2, 2]], columns=['A', 'B']) + + tabulator = Tabulator(df) + + counts = [] + + def cb(event, count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + tabulator.on_edit(cb) + + port = 6011 + serve(tabulator, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + model = list(tabulator._models.values())[0][0] + doc = model.document + event = TableEditEvent(model, 'A', 0) + with set_curdoc(doc): + for _ in range(2): + tabulator._server_event(doc, event) + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether Tabulator on_edit callback was executed concurrently + assert max(counts) == 2 + + +def test_server_thread_pool_periodic(threads): + button = Button(name='Click') + + counts = [] + + def cb(count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + port = 6012 + serve(button, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + doc = list(button._models.values())[0][0].document + with set_curdoc(doc): + state.add_periodic_callback(cb, 100) + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether periodic callbacks were executed concurrently + assert max(counts) >= 2 diff --git a/panel/tests/widgets/test_button.py b/panel/tests/widgets/test_button.py index 86309fb2be..d30c277ffb 100644 --- a/panel/tests/widgets/test_button.py +++ b/panel/tests/widgets/test_button.py @@ -1,4 +1,6 @@ -from panel.widgets import Button, Toggle +from bokeh.events import ButtonClick, MenuItemClick + +from panel.widgets import Button, MenuButton, Toggle def test_button(document, comm): @@ -9,7 +11,7 @@ def test_button(document, comm): assert isinstance(widget, button._widget_type) assert widget.label == 'Button' - button._process_events({'clicks': 1}) + button._process_event(None) assert button.clicks == 1 @@ -25,11 +27,28 @@ def callback(event): button.param.watch(callback, 'value') assert button.value == False - button._server_click(document, widget.ref['id'], None) + button._process_event(ButtonClick(widget)) assert events == [True] assert button.value == False +def test_menu_button(document, comm): + menu_items = [('Option A', 'a'), ('Option B', 'b'), ('Option C', 'c'), None, ('Help', 'help')] + menu_button = MenuButton(items=menu_items) + + widget = menu_button.get_root(document, comm=comm) + + events = [] + def callback(event): + events.append(event.new) + + menu_button.param.watch(callback, 'clicked') + + menu_button._process_event(MenuItemClick(widget, 'b')) + + assert events == ['b'] + + def test_button_jscallback_clicks(document, comm): button = Button(name='Button') code = 'console.log("Clicked!")' diff --git a/panel/tests/widgets/test_select.py b/panel/tests/widgets/test_select.py index a64ce1e822..5a4ed05474 100644 --- a/panel/tests/widgets/test_select.py +++ b/panel/tests/widgets/test_select.py @@ -73,7 +73,7 @@ def test_select_groups_list_options(document, comm): select._process_events({'value': str(groups['a'][1])}) assert select.value == groups['a'][1] - widget.value = str(groups['a'][0]) + select._process_events({'value': str(groups['a'][0])}) assert select.value == groups['a'][0] select.value = groups['a'][1] @@ -94,7 +94,7 @@ def test_select_groups_dict_options(document, comm): select._process_events({'value': str(groups['B']['c'])}) assert select.value == groups['B']['c'] - widget.value = str(groups['A']['b']) + select._process_events({'value': str(groups['A']['b'])}) assert select.value == groups['A']['b'] select.value = groups['A']['a'] diff --git a/panel/widgets/button.py b/panel/widgets/button.py index 316621da08..5b9681cadb 100644 --- a/panel/widgets/button.py +++ b/panel/widgets/button.py @@ -10,7 +10,7 @@ Button as _BkButton, Toggle as _BkToggle, Dropdown as _BkDropdown ) -from bokeh.events import (MenuItemClick, ButtonClick) +from bokeh.events import MenuItemClick, ButtonClick from .base import Widget @@ -34,8 +34,10 @@ class _ClickButton(_ButtonBase): def _get_model(self, doc, root=None, parent=None, comm=None): model = super()._get_model(doc, root, parent, comm) - ref = (root or model).ref['id'] - model.on_click(partial(self._server_click, doc, ref)) + if comm: + model.on_event(self._event, self._comm_event) + else: + model.on_event(self._event, partial(self._server_event, doc)) return model def js_on_click(self, args={}, code=""): @@ -108,21 +110,9 @@ def jslink(self, target, code=None, args=None, bidirectional=False, **links): jslink.__doc__ = Widget.jslink.__doc__ - def _server_click(self, doc, ref, event): - processing = bool(self._events) - self._events.update({"clicks": self.clicks+1}) + def _process_event(self, event): self.param.trigger('value') - if not processing: - if doc.session_context: - doc.add_timeout_callback(partial(self._change_coroutine, doc), self._debounce) - else: - self._change_event(doc) - - def _process_property_change(self, msg): - msg = super()._process_property_change(msg) - if 'clicks' in msg: - msg['clicks'] = self.clicks + 1 - return msg + self.clicks += 1 def on_click(self, callback): self.param.watch(callback, 'clicks', onlychanged=False) @@ -162,17 +152,12 @@ class MenuButton(_ClickButton): _event = 'menu_item_click' - def on_click(self, callback): - self.param.watch(callback, 'clicked', onlychanged=False) - - def _server_click(self, doc, ref, event): - processing = bool(self._events) + def _process_event(self, event): if isinstance(event, MenuItemClick): - self._events.update({"clicked": event.item}) + item = event.item elif isinstance(event, ButtonClick): - self._events.update({"clicked": self.name}) - if not processing: - if doc.session_context: - doc.add_timeout_callback(partial(self._change_coroutine, doc), self._debounce) - else: - self._change_event(doc) + item = self.name + self.clicked = item + + def on_click(self, callback): + self.param.watch(callback, 'clicked', onlychanged=False) diff --git a/panel/widgets/tables.py b/panel/widgets/tables.py index 2cfcc70b20..64401c5066 100644 --- a/panel/widgets/tables.py +++ b/panel/widgets/tables.py @@ -1282,7 +1282,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): ) self._link_props(model, ['page', 'sorters', 'expanded', 'filters'], doc, root, comm) if comm: - model.on_event('table-edit', self._process_event) + model.on_event('table-edit', self._comm_event) else: model.on_event('table-edit', partial(self._server_event, doc)) return model diff --git a/panel/widgets/terminal.py b/panel/widgets/terminal.py index 547d8fc436..d7dd43642e 100644 --- a/panel/widgets/terminal.py +++ b/panel/widgets/terminal.py @@ -283,7 +283,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): model = super()._get_model(doc, root, parent, comm) model.output = self.output if comm: - model.on_event('keystroke', self._process_event) + model.on_event('keystroke', self._comm_event) else: model.on_event('keystroke', partial(self._server_event, doc)) return model diff --git a/setup.py b/setup.py index b05ca887e4..8d417a0c8b 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,8 @@ def run(self): 'ipyvolume', 'ipyleaflet', 'xarray', - 'pyinstrument >=4.0' + 'pyinstrument >=4.0', + 'aiohttp' ], 'tests': _tests, 'recommended': _recommended,