From dda38597098ec792cac8963e4e505b1bc3b21d6d Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 2 Aug 2021 14:53:19 +0200 Subject: [PATCH 01/26] Implement built-in threading --- panel/config.py | 17 +++++++++++++++++ panel/io/state.py | 1 + panel/reactive.py | 9 ++++++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/panel/config.py b/panel/config.py index d1d1dfb0f3..520b23ff96 100644 --- a/panel/config.py +++ b/panel/config.py @@ -9,6 +9,7 @@ import os import sys +from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from weakref import WeakKeyDictionary @@ -125,6 +126,10 @@ class _config(_base_config): theme = param.ObjectSelector(default='default', objects=['default', 'dark'], doc=""" The theme to apply to the selected global template.""") + nthreads = param.Integer(default=0, doc=""" + Whether to launch a thread-pool which will be tasked with + handling user requests.""") + throttled = param.Boolean(default=False, doc=""" If sliders and inputs should be throttled until release of mouse.""") @@ -205,6 +210,18 @@ def __init__(self, **params): if self.log_level: panel_log_handler.setLevel(self.log_level) + @param.depends('nthreads', watch=True) + def _set_thread_pool(self): + if self.nthreads == 0: + if state._thread_pool is not None: + state._thread_pool.shutdown(wait=False) + state.thread_pool = None + return + from .io.state import state + if state._thread_pool: + raise RuntimeError("Thread pool already running") + state._thread_pool = ThreadPoolExecutor(max_workers=self.nthreads) + @contextmanager def set(self, **kwargs): values = [(k, v) for k, v in self.param.values().items() if k != 'name'] diff --git a/panel/io/state.py b/panel/io/state.py index 2f4a3c1c0b..6b154dca1e 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -68,6 +68,7 @@ class _state(param.Parameterized): # Used to ensure that events are not scheduled from the wrong thread _thread_id = None + _thread_pool = None _comm_manager = _CommManager diff --git a/panel/reactive.py b/panel/reactive.py index 3f8a7ca147..bf9f4d82ea 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -288,7 +288,10 @@ def _change_event(self, doc=None): state._thread_id = thread_id events = self._events self._events = {} - self._process_events(events) + if state._thread_pool: + state._thread_pool.submit(self._process_events, events) + else: + self._process_events(events) finally: state.curdoc = None state._thread_id = None @@ -300,8 +303,8 @@ def _comm_change(self, doc, ref, comm, subpath, attr, old, new): self._changing[ref].remove(attr) return - with hold(doc, comm=comm): - self._process_events({attr: new}) + self._events.update({attr: new}) + self._change_event(doc) def _server_event(self, doc, event): state._locks.clear() From 11e3c3328efb4fd89ee32dea66871d7bce5df29e Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 3 Aug 2021 12:16:39 +0200 Subject: [PATCH 02/26] Fixes for threading --- panel/config.py | 12 +++++++----- panel/reactive.py | 19 +++++++++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/panel/config.py b/panel/config.py index 520b23ff96..23b1effe84 100644 --- a/panel/config.py +++ b/panel/config.py @@ -127,8 +127,9 @@ class _config(_base_config): The theme to apply to the selected global template.""") nthreads = param.Integer(default=0, doc=""" - Whether to launch a thread-pool which will be tasked with - handling user requests.""") + When set to a non-None value a thread pool will be started. + Whenever an event arrives from the frontend it will be + dispatched to the thread pool to be processed.""") throttled = param.Boolean(default=False, doc=""" If sliders and inputs should be throttled until release of mouse.""") @@ -212,15 +213,16 @@ def __init__(self, **params): @param.depends('nthreads', watch=True) def _set_thread_pool(self): - if self.nthreads == 0: + if self.nthreads is None: if state._thread_pool is not None: state._thread_pool.shutdown(wait=False) - state.thread_pool = None + state._thread_pool = None return from .io.state import state if state._thread_pool: raise RuntimeError("Thread pool already running") - state._thread_pool = ThreadPoolExecutor(max_workers=self.nthreads) + threads = self.nthreads if self.nthreads else None + state._thread_pool = ThreadPoolExecutor(max_workers=threads) @contextmanager def set(self, **kwargs): diff --git a/panel/reactive.py b/panel/reactive.py index bf9f4d82ea..1db82a315c 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -278,7 +278,10 @@ def _process_events(self, events): @gen.coroutine def _change_coroutine(self, doc=None): - self._change_event(doc) + if state._thread_pool: + state._thread_pool.submit(self._change_event, doc) + else: + self._change_event(doc) def _change_event(self, doc=None): try: @@ -288,14 +291,15 @@ def _change_event(self, doc=None): state._thread_id = thread_id events = self._events self._events = {} - if state._thread_pool: - state._thread_pool.submit(self._process_events, events) - else: - self._process_events(events) + self._process_events(events) finally: state.curdoc = None state._thread_id = None + def _schedule_change(self, doc, comm): + with hold(doc, comm=comm): + self._change_event(doc) + def _comm_change(self, doc, ref, comm, subpath, attr, old, new): if subpath: attr = f'{subpath}.{attr}' @@ -304,7 +308,10 @@ def _comm_change(self, doc, ref, comm, subpath, attr, old, new): return self._events.update({attr: new}) - self._change_event(doc) + if state._thread_pool: + state._thread_pool.submit(self._schedule_change, doc, comm) + else: + self._schedule_change(doc, comm) def _server_event(self, doc, event): state._locks.clear() From 781c80b81e4fc46dbb456491fa12d0a7e42e68bd Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 30 Nov 2021 11:56:25 +0100 Subject: [PATCH 03/26] Fix handling of main thread --- panel/io/server.py | 8 +++++++- panel/io/state.py | 12 ++++++++++-- panel/reactive.py | 4 ---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/panel/io/server.py b/panel/io/server.py index b0dd74784e..ad9a2010af 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -401,6 +401,11 @@ def init_doc(doc): if not doc.session_context: return doc + thread = threading.current_thread() + if thread: + with set_curdoc(doc): + state._main_thread = thread.ident + session_id = doc.session_context.id sessions = state.session_info['sessions'] if session_id not in sessions: @@ -414,9 +419,10 @@ def init_doc(doc): @contextmanager def set_curdoc(doc): + orig_doc = state._curdoc state.curdoc = doc yield - state.curdoc = None + state.curdoc = orig_doc def with_lock(func): """ diff --git a/panel/io/state.py b/panel/io/state.py index 6b154dca1e..8e79f6ec58 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -67,7 +67,7 @@ class _state(param.Parameterized): _hold = False # Used to ensure that events are not scheduled from the wrong thread - _thread_id = None + _main_thread_ = WeakKeyDictionary() _thread_pool = None _comm_manager = _CommManager @@ -120,10 +120,18 @@ def __repr__(self): return "state(servers=[])" return "state(servers=[\n {}\n])".format(",\n ".join(server_info)) + @property + def _main_thread(self): + return self._main_thread_[self.curdoc] + + @_main_thread.setter + def _main_thread(self, thread_id): + self._main_thread_[self.curdoc] = thread_id + def _unblocked(self, doc): thread = threading.current_thread() thread_id = thread.ident if thread else None - return doc is self.curdoc and self._thread_id == thread_id + return doc is self.curdoc and self._main_thread == thread_id @param.depends('busy', watch=True) def _update_busy(self): diff --git a/panel/reactive.py b/panel/reactive.py index 1db82a315c..41729b5899 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -286,15 +286,11 @@ def _change_coroutine(self, doc=None): def _change_event(self, doc=None): try: state.curdoc = doc - thread = threading.current_thread() - thread_id = thread.ident if thread else None - state._thread_id = thread_id events = self._events self._events = {} self._process_events(events) finally: state.curdoc = None - state._thread_id = None def _schedule_change(self, doc, comm): with hold(doc, comm=comm): From c360729923af0da2466fe0c6ef9de1d0acf11036 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 30 Nov 2021 11:56:53 +0100 Subject: [PATCH 04/26] Allow setting num_threads as CLI option --- panel/command/serve.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/panel/command/serve.py b/panel/command/serve.py index 08b48548f5..3482cac7e3 100644 --- a/panel/command/serve.py +++ b/panel/command/serve.py @@ -174,6 +174,12 @@ class Serve(_BkServe): ('--autoreload', dict( action = 'store_true', help = "Whether to autoreload source when script changes." + )), + ('--num-threads', dict( + action = 'store', + type = int, + help = "Whether to start a thread pool which events are dispatched to.", + default = -1 )) ) @@ -285,6 +291,9 @@ def customize_kwargs(self, args, server_kwargs): patterns.extend(pattern) state.publish('session_info', state, ['session_info']) + if args.num_threads != -1: + config.nthreads = args.num_threads + if args.oauth_provider: config.oauth_provider = args.oauth_provider config.oauth_expiry = args.oauth_expiry_days From 8a94c0472771a8d46c79ae4593cf78eb25d5a5db Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 30 Nov 2021 13:23:18 +0100 Subject: [PATCH 05/26] Enable threading on periodic callbacks --- panel/io/callbacks.py | 54 ++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/panel/io/callbacks.py b/panel/io/callbacks.py index 66b32b3d04..b1b57efe05 100644 --- a/panel/io/callbacks.py +++ b/panel/io/callbacks.py @@ -73,26 +73,25 @@ def _update_period(self): self.stop() self.start() - async def _periodic_callback(self): - with edit_readonly(state): - state.busy = True + def _exec_callback(self, post=False): + from .server import set_curdoc + try: + with set_curdoc(self._doc): + cb = self.callback() + except Exception: + cb = None + if post: + self._post_callback() + return cb + + def _post_callback(self): cbname = function_name(self.callback) if self._doc: _periodic_logger.info( - LOG_PERIODIC_START, id(self._doc), cbname, self._counter + LOG_PERIODIC_END, id(self._doc), cbname, self._counter ) - try: - if inspect.isasyncgenfunction(self.callback) or inspect.iscoroutinefunction(self.callback): - await self.callback() - else: - self.callback() - finally: - if self._doc: - _periodic_logger.info( - LOG_PERIODIC_END, id(self._doc), cbname, self._counter - ) - with edit_readonly(state): - state.busy = False + with edit_readonly(state): + state.busy = False self._counter += 1 if self.timeout is not None: dt = (time.time() - self._start_time) * 1000 @@ -101,6 +100,29 @@ async def _periodic_callback(self): if self._counter == self.count: self.stop() + async def _periodic_callback(self): + with edit_readonly(state): + state.busy = True + cbname = function_name(self.callback) + if self._doc: + _periodic_logger.info( + LOG_PERIODIC_START, id(self._doc), cbname, self._counter + ) + is_async = ( + inspect.isasyncgenfunction(self.callback) or + inspect.iscoroutinefunction(self.callback) + ) + if state._thread_pool and not is_async: + from tornado.ioloop import IOLoop + state._thread_pool.submit(self._exec_callback, True) + return + try: + cb = self._exec_callback() + if inspect.isawaitable(cb): + await cb + finally: + self._post_callback() + @property def counter(self): """ From 390aaa2c015c55ff092cc4f708c23e8898e17784 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 30 Nov 2021 14:22:13 +0100 Subject: [PATCH 06/26] Add docs --- examples/user_guide/Deploy_and_Export.ipynb | 1 + examples/user_guide/Overview.ipynb | 1 + .../Performance_and_Debugging.ipynb | 65 +++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/examples/user_guide/Deploy_and_Export.ipynb b/examples/user_guide/Deploy_and_Export.ipynb index f57df4f169..f881ea1e0f 100644 --- a/examples/user_guide/Deploy_and_Export.ipynb +++ b/examples/user_guide/Deploy_and_Export.ipynb @@ -382,6 +382,7 @@ " no index is served.\n", " --num-procs N Number of worker processes for an app. Using 0 will\n", " autodetect number of cores (defaults to 1)\n", + " --num-threads N Number of threads to launch in a ThreadPoolExecutor which Panel will dispatch events to. \n", " --warm Whether to execute scripts on startup to warm up the server.\n", " --autoreload\n", " Whether to automatically reload user sessions when the application or any of its imports change.\n", diff --git a/examples/user_guide/Overview.ipynb b/examples/user_guide/Overview.ipynb index 53b3f514de..ab2a05b6a3 100644 --- a/examples/user_guide/Overview.ipynb +++ b/examples/user_guide/Overview.ipynb @@ -170,6 +170,7 @@ "> - `js_files`: External JS files to load. Dictionary should map from exported name to the URL of the JS file.\n", "> - `loading_spinner`: The style of the global loading indicator, e.g. 'arcs', 'bars', 'dots', 'petals'.\n", "> - `loading_color`: The color of the global loading indicator as a hex color, e.g. #6a6a6a\n", + "> - `nthreads`: If set will start a `ThreadPoolExecutor` to dispatch events to.\n", "> - `raw_css`: List of raw CSS strings to add to load.\n", "> - `safe_embed`: Whether to record all set events when embedding rather than just those that are changed\n", "> - `session_history`: If set to a non-zero value this determines the maximum length of the pn.state.session_info dictionary, which tracks information about user sessions. A value of -1 indicates an unlimited history.\n", diff --git a/examples/user_guide/Performance_and_Debugging.ipynb b/examples/user_guide/Performance_and_Debugging.ipynb index 6d88d4c330..c4edf489ef 100644 --- a/examples/user_guide/Performance_and_Debugging.ipynb +++ b/examples/user_guide/Performance_and_Debugging.ipynb @@ -46,6 +46,71 @@ "The first time the app is loaded the data will be cached and subsequent sessions will simply look up the data in the cache, speeding up the process of rendering. If you want to warm up the cache before the first user visits the application you can also provide the `--warm` argument to the `panel serve` command, which will ensure the application is initialized once on launch." ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Threading and Async IO\n", + "\n", + "Threading and async are both approaches to speed up processing in Python using concurrency. Since we can't provide a complete primer on either threading or async here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", + "\n", + "When to use which approach cannot be answered easily however as a general guide use `asyncio` when you are performing many slow IO bound operations simultaneously and use threading when you have a limited amount of connections or file operations. In some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/async-io-python/), this includes many NumPy, Pandas and Numba functions.\n", + "\n", + "### Threading\n", + "\n", + "Threading in Panel can be enabled easily by setting the `config.nthreads` parameter. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to zero it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. If launching an application with `panel serve` you can configure this option with the `--num-threads` option.\n", + "\n", + "### AsyncIO\n", + "\n", + "When using Python>=3.8 you can use async callbacks wherever you would ordinarily use a regular synchronous function, e.g. you can use `pn.bind` on an async function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import aiohttp\n", + "\n", + "widget = pn.widgets.IntSlider(start=0, end=10)\n", + "\n", + "async def get_img(index):\n", + " async with aiohttp.ClientSession() as session:\n", + " async with session.get(f\"https://picsum.photos/800/300?image={index}\") as resp:\n", + " return pn.pane.JPG(await resp.read())\n", + " \n", + "pn.Column(widget, pn.bind(get_img, widget))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Similarly you can attach asynchronous callbacks using `.param.watch`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "widget = pn.widgets.IntSlider(start=0, end=10)\n", + "\n", + "image = pn.pane.JPG()\n", + "\n", + "async def update_img(event):\n", + " async with aiohttp.ClientSession() as session:\n", + " async with session.get(f\"https://picsum.photos/800/300?image={event.new}\") as resp:\n", + " image.object = await resp.read()\n", + " \n", + "widget.param.watch(update_img, 'value')\n", + "widget.param.trigger('value')\n", + " \n", + "pn.Column(widget, image)" + ] + }, { "cell_type": "markdown", "metadata": {}, From 1bd350ca750cfd892c2d8629c15a44f22d14c577 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 30 Nov 2021 14:24:41 +0100 Subject: [PATCH 07/26] Update defaults --- panel/command/serve.py | 4 ++-- panel/config.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/panel/command/serve.py b/panel/command/serve.py index 3482cac7e3..d1c4367699 100644 --- a/panel/command/serve.py +++ b/panel/command/serve.py @@ -179,7 +179,7 @@ class Serve(_BkServe): action = 'store', type = int, help = "Whether to start a thread pool which events are dispatched to.", - default = -1 + default = None )) ) @@ -291,7 +291,7 @@ def customize_kwargs(self, args, server_kwargs): patterns.extend(pattern) state.publish('session_info', state, ['session_info']) - if args.num_threads != -1: + if args.num_threads is not None: config.nthreads = args.num_threads if args.oauth_provider: diff --git a/panel/config.py b/panel/config.py index 23b1effe84..1c4ddfd2c3 100644 --- a/panel/config.py +++ b/panel/config.py @@ -126,7 +126,7 @@ class _config(_base_config): theme = param.ObjectSelector(default='default', objects=['default', 'dark'], doc=""" The theme to apply to the selected global template.""") - nthreads = param.Integer(default=0, doc=""" + nthreads = param.Integer(default=None, doc=""" When set to a non-None value a thread pool will be started. Whenever an event arrives from the frontend it will be dispatched to the thread pool to be processed.""") From 8bb82a886a4fe4487977ed971f0e692173233301 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 11:43:09 +0100 Subject: [PATCH 08/26] Improve docs --- examples/user_guide/Deploy_and_Export.ipynb | 4 +++- examples/user_guide/Overview.ipynb | 2 +- .../user_guide/Performance_and_Debugging.ipynb | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/examples/user_guide/Deploy_and_Export.ipynb b/examples/user_guide/Deploy_and_Export.ipynb index f881ea1e0f..994d685d6b 100644 --- a/examples/user_guide/Deploy_and_Export.ipynb +++ b/examples/user_guide/Deploy_and_Export.ipynb @@ -382,7 +382,9 @@ " no index is served.\n", " --num-procs N Number of worker processes for an app. Using 0 will\n", " autodetect number of cores (defaults to 1)\n", - " --num-threads N Number of threads to launch in a ThreadPoolExecutor which Panel will dispatch events to. \n", + " --num-threads N Number of threads to launch in a ThreadPoolExecutor which\n", + " Panel will dispatch events to for concurrent execution on\n", + " separate cores (defaults to None).\n", " --warm Whether to execute scripts on startup to warm up the server.\n", " --autoreload\n", " Whether to automatically reload user sessions when the application or any of its imports change.\n", diff --git a/examples/user_guide/Overview.ipynb b/examples/user_guide/Overview.ipynb index ab2a05b6a3..1312dd9dec 100644 --- a/examples/user_guide/Overview.ipynb +++ b/examples/user_guide/Overview.ipynb @@ -170,7 +170,7 @@ "> - `js_files`: External JS files to load. Dictionary should map from exported name to the URL of the JS file.\n", "> - `loading_spinner`: The style of the global loading indicator, e.g. 'arcs', 'bars', 'dots', 'petals'.\n", "> - `loading_color`: The color of the global loading indicator as a hex color, e.g. #6a6a6a\n", - "> - `nthreads`: If set will start a `ThreadPoolExecutor` to dispatch events to.\n", + "> - `nthreads`: If set will start a `ThreadPoolExecutor` to dispatch events to for concurrent execution on separate cores. By default no thread pool is launched, while setting nthreads=0 launches `min(32, os.cpu_count() + 4)` threads.\n", "> - `raw_css`: List of raw CSS strings to add to load.\n", "> - `safe_embed`: Whether to record all set events when embedding rather than just those that are changed\n", "> - `session_history`: If set to a non-zero value this determines the maximum length of the pn.state.session_info dictionary, which tracks information about user sessions. A value of -1 indicates an unlimited history.\n", diff --git a/examples/user_guide/Performance_and_Debugging.ipynb b/examples/user_guide/Performance_and_Debugging.ipynb index c4edf489ef..af83bf73e4 100644 --- a/examples/user_guide/Performance_and_Debugging.ipynb +++ b/examples/user_guide/Performance_and_Debugging.ipynb @@ -52,17 +52,17 @@ "source": [ "## Threading and Async IO\n", "\n", - "Threading and async are both approaches to speed up processing in Python using concurrency. Since we can't provide a complete primer on either threading or async here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", + "Threading and async are both approaches to speed up processing in Python using concurrency. Since we can't provide a complete primer on either threading or asynchronous processing here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", "\n", - "When to use which approach cannot be answered easily however as a general guide use `asyncio` when you are performing many slow IO bound operations simultaneously and use threading when you have a limited amount of connections or file operations. In some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/async-io-python/), this includes many NumPy, Pandas and Numba functions.\n", + "When to use which approach cannot be answered easily and is never completely clear cut. As a general guide however use `asyncio` when you are performing many slow IO bound operations concurrently (on the order of 100 and more) and use threading when you are performing a limited number (less than the number of available threads). In some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/async-io-python/), this includes many NumPy, Pandas and Numba functions.\n", "\n", "### Threading\n", "\n", - "Threading in Panel can be enabled easily by setting the `config.nthreads` parameter. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to zero it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. If launching an application with `panel serve` you can configure this option with the `--num-threads` option.\n", + "Threading in Panel can be enabled easily by setting the `config.nthreads` parameter. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to `0` it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. If launching an application with `panel serve` you can configure this option with the `--num-threads` option.\n", "\n", "### AsyncIO\n", "\n", - "When using Python>=3.8 you can use async callbacks wherever you would ordinarily use a regular synchronous function, e.g. you can use `pn.bind` on an async function:" + "When using Python>=3.8 you can use async callbacks wherever you would ordinarily use a regular synchronous function. For instance you can use `pn.bind` on an async function:" ] }, { @@ -87,6 +87,8 @@ "cell_type": "markdown", "metadata": {}, "source": [ + "In this example Panel will invoke the function and update the output when the function returns while leaving the process unblocked for the duration of the `aiohttp` request. \n", + "\n", "Similarly you can attach asynchronous callbacks using `.param.watch`:" ] }, @@ -111,6 +113,13 @@ "pn.Column(widget, image)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this example Param will await the asynchronous function and the image will be updated when the request completes." + ] + }, { "cell_type": "markdown", "metadata": {}, From 3640358caf07934247a6477e17bf36d5e927732f Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 11:55:30 +0100 Subject: [PATCH 09/26] Dispatch bokeh events on threads --- panel/pane/vega.py | 5 ++++- panel/reactive.py | 35 +++++++++++++++++++++++++---------- panel/widgets/tables.py | 2 +- panel/widgets/terminal.py | 2 +- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/panel/pane/vega.py b/panel/pane/vega.py index 6e7151ea61..2c2dc2532f 100644 --- a/panel/pane/vega.py +++ b/panel/pane/vega.py @@ -226,7 +226,10 @@ def _get_model(self, doc, root=None, parent=None, comm=None): data=json, data_sources=sources, events=self._selections, throttle=self._throttle, **props ) - model.on_event('vega_event', self._process_event) + if comm: + model.on_event('vega_event', self._comm_event) + else: + model.on_event('vega_event', partial(self._server_event, doc)) if root is None: root = model self._models[root.ref['id']] = (model, parent) diff --git a/panel/reactive.py b/panel/reactive.py index 41729b5899..01596b163d 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -283,6 +283,13 @@ def _change_coroutine(self, doc=None): else: self._change_event(doc) + @gen.coroutine + def _event_coroutine(self, event): + if state._thread_pool: + state._thread_pool.submit(self._process_event, event) + else: + self._process_event(event) + def _change_event(self, doc=None): try: state.curdoc = doc @@ -309,11 +316,17 @@ def _comm_change(self, doc, ref, comm, subpath, attr, old, new): else: self._schedule_change(doc, comm) + def _comm_event(self, event): + if state._thread_pool: + state._thread_pool.submit(self._process_event, event) + else: + self._process_event(event) + def _server_event(self, doc, event): state._locks.clear() if doc.session_context: doc.add_timeout_callback( - partial(self._process_event, event), + partial(self._event_coroutine, event), self._debounce ) else: @@ -329,14 +342,16 @@ def _server_change(self, doc, ref, subpath, attr, old, new): state._locks.clear() processing = bool(self._events) self._events.update({attr: new}) - if not processing: - if doc.session_context: - doc.add_timeout_callback( - partial(self._change_coroutine, doc), - self._debounce - ) - else: - self._change_event(doc) + if processing: + return + + if doc.session_context: + doc.add_timeout_callback( + partial(self._change_coroutine, doc), + self._debounce + ) + else: + self._change_event(doc) class Reactive(Syncable, Viewable): @@ -1516,7 +1531,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): model.children = self._get_children(doc, root, model, comm) if comm: - model.on_event('dom_event', self._process_event) + model.on_event('dom_event', self._comm_event) else: model.on_event('dom_event', partial(self._server_event, doc)) diff --git a/panel/widgets/tables.py b/panel/widgets/tables.py index 2cfcc70b20..64401c5066 100644 --- a/panel/widgets/tables.py +++ b/panel/widgets/tables.py @@ -1282,7 +1282,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): ) self._link_props(model, ['page', 'sorters', 'expanded', 'filters'], doc, root, comm) if comm: - model.on_event('table-edit', self._process_event) + model.on_event('table-edit', self._comm_event) else: model.on_event('table-edit', partial(self._server_event, doc)) return model diff --git a/panel/widgets/terminal.py b/panel/widgets/terminal.py index 547d8fc436..d7dd43642e 100644 --- a/panel/widgets/terminal.py +++ b/panel/widgets/terminal.py @@ -283,7 +283,7 @@ def _get_model(self, doc, root=None, parent=None, comm=None): model = super()._get_model(doc, root, parent, comm) model.output = self.output if comm: - model.on_event('keystroke', self._process_event) + model.on_event('keystroke', self._comm_event) else: model.on_event('keystroke', partial(self._server_event, doc)) return model From cc20aea2f17c16c7dff417983b0c09589fc9bf89 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 12:43:21 +0100 Subject: [PATCH 10/26] Add tests --- panel/tests/conftest.py | 8 +++ panel/tests/test_server.py | 115 ++++++++++++++++++++++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/panel/tests/conftest.py b/panel/tests/conftest.py index b4b4ffdea3..e709fb3a07 100644 --- a/panel/tests/conftest.py +++ b/panel/tests/conftest.py @@ -14,6 +14,7 @@ from bokeh.client import pull_session from pyviz_comms import Comm +from panel import config from panel.pane import HTML, Markdown from panel.io import state from panel import serve @@ -182,3 +183,10 @@ def py_file(): tf = tempfile.NamedTemporaryFile(mode='w', suffix='.py') yield tf tf.close() + + +@pytest.fixture +def threads(): + config.nthreads = 4 + yield 4 + config.nthreads = None diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index 26ceb861de..6320294967 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -9,12 +9,14 @@ from panel.config import config from panel.io import state +from panel.layout import Row from panel.models import HTML as BkHTML +from panel.models.tabulator import TableEditEvent from panel.pane import Markdown from panel.io.resources import DIST_DIR from panel.io.server import get_server, serve, set_curdoc from panel.template import BootstrapTemplate -from panel.widgets import Button +from panel.widgets import Button, Tabulator def test_get_server(html_server_session): @@ -291,3 +293,114 @@ def test_serve_can_serve_bokeh_app_from_file(): path = pathlib.Path(__file__).parent / "io"/"bk_app.py" server = get_server({"bk-app": path}) assert "/bk-app" in server._tornado.applications + + +def test_server_thread_pool_change_event(threads): + button = Button(name='Click') + button2 = Button(name='Click') + + counts = [] + + def cb(event, count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + button.on_click(cb) + button2.on_click(cb) + layout = Row(button, button2) + + port = 6010 + server = serve(layout, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + model = list(layout._models.values())[0][0] + doc = model.document + with set_curdoc(doc): + button._server_change(doc, model.ref['id'], None, 'clicks', 0, 1) + button2._server_change(doc, model.ref['id'], None, 'clicks', 0, 1) + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether Button on_click callback was executed concurrently + assert max(counts) == 2 + + +def test_server_thread_pool_bokeh_event(threads): + import pandas as pd + + df = pd.DataFrame([[1, 1], [2, 2]], columns=['A', 'B']) + + tabulator = Tabulator(df) + + counts = [] + + def cb(event, count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + tabulator.on_edit(cb) + + port = 6011 + server = serve(tabulator, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + model = list(tabulator._models.values())[0][0] + doc = model.document + event = TableEditEvent(model, 'A', 0) + with set_curdoc(doc): + for _ in range(2): + tabulator._server_event(doc, event) + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether Tabulator on_edit callback was executed concurrently + assert max(counts) == 2 + + +def test_server_thread_pool_periodic(threads): + button = Button(name='Click') + + counts = [] + + def setup(event): + state.add_periodic_callback(cb, 100) + + def cb(count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] -= 1 + + button.on_click(setup) + + port = 6012 + server = serve(button, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + + doc = list(button._models.values())[0][0].document + with set_curdoc(doc): + button.clicks += 1 + + # Wait for callbacks to be scheduled + time.sleep(1) + + # Checks whether periodic callbacks were executed concurrently + assert max(counts) >= 2 From 7500a7f34f7a5a279d6c9775ce2d340934cf7a09 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 12:44:33 +0100 Subject: [PATCH 11/26] Fix flakes --- panel/io/callbacks.py | 1 - panel/pane/vega.py | 2 ++ panel/reactive.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/panel/io/callbacks.py b/panel/io/callbacks.py index b1b57efe05..4e22481f56 100644 --- a/panel/io/callbacks.py +++ b/panel/io/callbacks.py @@ -113,7 +113,6 @@ async def _periodic_callback(self): inspect.iscoroutinefunction(self.callback) ) if state._thread_pool and not is_async: - from tornado.ioloop import IOLoop state._thread_pool.submit(self._exec_callback, True) return try: diff --git a/panel/pane/vega.py b/panel/pane/vega.py index 2c2dc2532f..fa3d3d7396 100644 --- a/panel/pane/vega.py +++ b/panel/pane/vega.py @@ -1,5 +1,7 @@ import sys +from functools import partial + import param import numpy as np diff --git a/panel/reactive.py b/panel/reactive.py index 01596b163d..9116d2b34b 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -9,7 +9,6 @@ import re import sys import textwrap -import threading from collections import Counter, defaultdict, namedtuple from functools import partial From 1b8d5873409b2575e409a5ddfcba4935371b2335 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 15:09:57 +0100 Subject: [PATCH 12/26] Fix flakes --- panel/tests/test_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index 6320294967..0d2fdc8214 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -312,7 +312,7 @@ def cb(event, count=[0]): layout = Row(button, button2) port = 6010 - server = serve(layout, port=port, threaded=True, show=False) + serve(layout, port=port, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -350,7 +350,7 @@ def cb(event, count=[0]): tabulator.on_edit(cb) port = 6011 - server = serve(tabulator, port=port, threaded=True, show=False) + serve(tabulator, port=port, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -388,7 +388,7 @@ def cb(count=[0]): button.on_click(setup) port = 6012 - server = serve(button, port=port, threaded=True, show=False) + serve(button, port=port, threaded=True, show=False) # Wait for server to start time.sleep(1) From 363a3cf0729f1898956697e25a4ae7ff277e480a Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 1 Dec 2021 17:30:38 +0100 Subject: [PATCH 13/26] Fix config._set_thread_pool --- panel/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/panel/config.py b/panel/config.py index 1c4ddfd2c3..20dd222597 100644 --- a/panel/config.py +++ b/panel/config.py @@ -213,12 +213,12 @@ def __init__(self, **params): @param.depends('nthreads', watch=True) def _set_thread_pool(self): + from .io.state import state if self.nthreads is None: if state._thread_pool is not None: state._thread_pool.shutdown(wait=False) state._thread_pool = None return - from .io.state import state if state._thread_pool: raise RuntimeError("Thread pool already running") threads = self.nthreads if self.nthreads else None From dbe52709aae6ea89836e979ad8ee8f8faa08d2de Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 10:30:56 +0100 Subject: [PATCH 14/26] Undo name change --- panel/io/server.py | 2 +- panel/io/state.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/panel/io/server.py b/panel/io/server.py index ad9a2010af..73b3b0d399 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -404,7 +404,7 @@ def init_doc(doc): thread = threading.current_thread() if thread: with set_curdoc(doc): - state._main_thread = thread.ident + state._thread_id = thread.ident session_id = doc.session_context.id sessions = state.session_info['sessions'] diff --git a/panel/io/state.py b/panel/io/state.py index 8e79f6ec58..caa30dee27 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -67,7 +67,7 @@ class _state(param.Parameterized): _hold = False # Used to ensure that events are not scheduled from the wrong thread - _main_thread_ = WeakKeyDictionary() + _thread_id_ = WeakKeyDictionary() _thread_pool = None _comm_manager = _CommManager @@ -121,17 +121,17 @@ def __repr__(self): return "state(servers=[\n {}\n])".format(",\n ".join(server_info)) @property - def _main_thread(self): - return self._main_thread_[self.curdoc] + def _thread_id(self): + return self._thread_id_[self.curdoc] - @_main_thread.setter - def _main_thread(self, thread_id): - self._main_thread_[self.curdoc] = thread_id + @_thread_id.setter + def _thread_id(self, thread_id): + self._thread_id_[self.curdoc] = thread_id def _unblocked(self, doc): thread = threading.current_thread() thread_id = thread.ident if thread else None - return doc is self.curdoc and self._main_thread == thread_id + return doc is self.curdoc and self._thread_id == thread_id @param.depends('busy', watch=True) def _update_busy(self): From bc45a5a24d8b236c2480c36b734ba4f813f79c1f Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 17:15:03 +0100 Subject: [PATCH 15/26] Use event API for button clicks --- panel/widgets/button.py | 40 +++++++++++++--------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/panel/widgets/button.py b/panel/widgets/button.py index 316621da08..62c8d46663 100644 --- a/panel/widgets/button.py +++ b/panel/widgets/button.py @@ -35,7 +35,10 @@ class _ClickButton(_ButtonBase): def _get_model(self, doc, root=None, parent=None, comm=None): model = super()._get_model(doc, root, parent, comm) ref = (root or model).ref['id'] - model.on_click(partial(self._server_click, doc, ref)) + if comm: + model.on_event(self._event, self._comm_event) + else: + model.on_event(self._event, partial(self._server_event, doc)) return model def js_on_click(self, args={}, code=""): @@ -108,21 +111,9 @@ def jslink(self, target, code=None, args=None, bidirectional=False, **links): jslink.__doc__ = Widget.jslink.__doc__ - def _server_click(self, doc, ref, event): - processing = bool(self._events) - self._events.update({"clicks": self.clicks+1}) + def _process_event(self, event): self.param.trigger('value') - if not processing: - if doc.session_context: - doc.add_timeout_callback(partial(self._change_coroutine, doc), self._debounce) - else: - self._change_event(doc) - - def _process_property_change(self, msg): - msg = super()._process_property_change(msg) - if 'clicks' in msg: - msg['clicks'] = self.clicks + 1 - return msg + self.clicks += 1 def on_click(self, callback): self.param.watch(callback, 'clicks', onlychanged=False) @@ -162,17 +153,12 @@ class MenuButton(_ClickButton): _event = 'menu_item_click' - def on_click(self, callback): - self.param.watch(callback, 'clicked', onlychanged=False) - - def _server_click(self, doc, ref, event): - processing = bool(self._events) + def _process_event(self, event): if isinstance(event, MenuItemClick): - self._events.update({"clicked": event.item}) + item = event.item elif isinstance(event, ButtonClick): - self._events.update({"clicked": self.name}) - if not processing: - if doc.session_context: - doc.add_timeout_callback(partial(self._change_coroutine, doc), self._debounce) - else: - self._change_event(doc) + item = self.name + self.clicked = item + + def on_click(self, callback): + self.param.watch(callback, 'clicked', onlychanged=False) From 9597a3281c25e33fcc8ff2ddd95397f2954c6b23 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 17:15:35 +0100 Subject: [PATCH 16/26] Do not server debounce events --- panel/reactive.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/panel/reactive.py b/panel/reactive.py index 9116d2b34b..19f69e53a3 100644 --- a/panel/reactive.py +++ b/panel/reactive.py @@ -324,9 +324,8 @@ def _comm_event(self, event): def _server_event(self, doc, event): state._locks.clear() if doc.session_context: - doc.add_timeout_callback( - partial(self._event_coroutine, event), - self._debounce + doc.add_next_tick_callback( + partial(self._event_coroutine, event) ) else: self._process_event(event) From 2fba19ff5ec678adaf87dc0320096c736e934586 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 17:16:28 +0100 Subject: [PATCH 17/26] Update docs --- .../Performance_and_Debugging.ipynb | 76 ++++++++++++++++++- 1 file changed, 72 insertions(+), 4 deletions(-) diff --git a/examples/user_guide/Performance_and_Debugging.ipynb b/examples/user_guide/Performance_and_Debugging.ipynb index af83bf73e4..05d6ea9128 100644 --- a/examples/user_guide/Performance_and_Debugging.ipynb +++ b/examples/user_guide/Performance_and_Debugging.ipynb @@ -50,15 +50,83 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Threading and Async IO\n", + "## Concurrent processing\n", "\n", - "Threading and async are both approaches to speed up processing in Python using concurrency. Since we can't provide a complete primer on either threading or asynchronous processing here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", + "When deploying a Panel application to be accessed by multiple users they will often access the same server simultaneously. To maintain responsiveness of the application when multiple users are interacting with it at the same time there are multiple approaches to concurrency, each with their own drawbacks and advantages:\n", "\n", - "When to use which approach cannot be answered easily and is never completely clear cut. As a general guide however use `asyncio` when you are performing many slow IO bound operations concurrently (on the order of 100 and more) and use threading when you are performing a limited number (less than the number of available threads). In some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/async-io-python/), this includes many NumPy, Pandas and Numba functions.\n", + "1. `Load balancing`: A load balancer launches multiple instances of the Panel application and distributes network traffic between them. This is ensures that users the load is distributed across multiple servers but also requires a lot configuration and resources.\n", + "2. `Multiple processes`: Launches multiple processes on a single machine, effectively load balancing across the processes. Much simpler to set up than a load balancer but you are limited by the compute and memory resources on one machine.\n", + "2. `Threading`: Attempts to distribute processing across multiple threads. Effectiveness depends on the operations being performed, I/O bound and CPU bound operations that release the GIL can easily be made concurrent in this way. \n", + "3. `AsyncIO`: Allows asynchronously processing I/O bound operations. Effective for many concurrent I/O operations but requires rewriting your application and callbacks to make use of `async`/`await` paradigm.\n", + "\n", + "### Scaling across processes\n", + "\n", + "Both load balancing and starting multiple processes effectively spin up multiple copies of the same application and distribute the load across the processes. This results in duplication and therefore significantly higher overhead (basically scaling linearly with the number of processes). In applications where you are relying on global state (e.g. the `pn.state.cache`) this can introduce significant challenges to ensure that application state stays synchronized.\n", + "\n", + "#### Load balancing\n", + "\n", + "Setting up load balancing is a huge topic dependent on the precise system you are using so we won't go into any specific implementation here. In most cases you set up a reverse proxy (like NGINX) to distribute the load across multiple application servers. If you are using a system like Kubernetes it will also handle spinning up the servers for you and can even do so dynamically depending on the amount of concurrent users to ensure that you are not wasting resources when there are fewer users.\n", + "\n", + "
\n", + "\n", + "
Diagram showing concept of load balacing (NGINX)
\n", + "
\n", + "\n", + "Load balancing is the most complex approach to set up but is guaranteed to improve concurrent usage of your application since different users are not contending for access to the same process or even necessarily the same physical compute and memory resources. At the same time it is more wasteful of resources since it potentially occupies multiple machines and since each process is isolated there is no sharing of cached data or global state. \n", + "\n", + "#### Multiple processes\n", + "\n", + "Launching a Panel application on multiple processes is a effectively a simpler version of load balancing with many of the same advantages and drawbacks. One major advantage is that it is easy to set up, when deploying your application with `panel serve` simply configure `--num-procs N`, where N is the number of processes. Generally choose an `N` that is no larger than the number of processors on your machine. This still uses significantly more resources since each process has the same overhead and all processes will be contending for the same memory and compute resources. However if your application is single-threaded and you have sufficient memory this is a simple way to make your application scale.\n", + "\n", + "### Scaling within a single process\n", + "\n", + "Threading and async are both approaches to speed up processing in Python using concurrency in a single Python process. Since we can't provide a complete primer on either threading or asynchronous processing here, if you are not familiar with these concepts we recommend reading up on them before continuing. Read about [threading in Python here](https://realpython.com/intro-to-python-threading/) and [AsyncIO here](https://realpython.com/async-io-python/).\n", + "\n", + "When to use which approach cannot be answered easily and is never completely clear cut. As a general guide however use `asyncio` can scale almost arbitrarily allowing you to perform thousands or even millions of IO bound operations concurrently, while threading limits you to the number of available threads. In practice this may never actually become relevant so the other main differences are that `async` coroutines are significantly more lightweight but that you have to carefully consider accessing shared objects across threads. Using `async` coroutines makes it very clear where concurrency occurs and therefore can make it easier to avoid race conditions and avoid having to think about locking a thread to access shared objects. However, in some situations threading can also be useful for CPU intensive operations where the operation being executed [releases the GIL](https://realpython.com/python-gil/), this includes many NumPy, Pandas and Numba functions.\n", "\n", "### Threading\n", "\n", - "Threading in Panel can be enabled easily by setting the `config.nthreads` parameter. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to `0` it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. If launching an application with `panel serve` you can configure this option with the `--num-threads` option.\n", + "Using threading in Panel can either be enabled manually, e.g. by managing your own thread pool and dispatching concurrent tasks to it, or it can be managed by Panel itself by setting the `config.nthreads` parameter (or equivalently by setting it with `pn.extension(nthreads=...)`. This will start a `ThreadPoolExecutor` with the specified number of threads (or if set to `0` it will set the number of threads based on your system, i.e. `min(32, os.cpu_count() + 4)`). \n", + "\n", + "Whenever an event is generated or a periodic callback fires Panel will then automatically dispatch the event to the executor. An event in this case refers to any action generated on the frontend such as the manipulation of a widget by a user or the interaction with a plot. If you are launching an application with `panel serve` you should enable this option configure this option on the CLI by setting `--num-threads`.\n", + "\n", + "To demonstrate the effect of enabling threading take this example below:\n", + "\n", + "```python\n", + "import panel as pn\n", + "\n", + "pn.extension(nthreads=2)\n", + "\n", + "def button_click(event):\n", + " print('Button clicked for the {event.new}th time.')\n", + " time.sleep(2) # Simulate long running operation\n", + " print('Finished processing {event.new}th click.')\n", + " \n", + "button = pn.widgets.Button(name='Click me!')\n", + "\n", + "button.on_click(button_click)\n", + "```\n", + "\n", + "When we click the button twice successively in a single-threaded context we will see the following output:\n", + "\n", + "```\n", + "> Button clicked for the 1th time.\n", + "... 2 second wait\n", + "> Finished processing 1th click.\n", + "> Button clicked for the 2th time.\n", + "... 2 second wait\n", + "> Finished processing 2th click.\n", + "```\n", + "\n", + "In a threaded context on the other hand the two clicks will be processed concurrently:\n", + "\n", + "```\n", + "> Button clicked for the 1th time.\n", + "> Button clicked for the 2th time.\n", + "... 2 second wait\n", + "> Finished processing 1th click.\n", + "> Finished processing 2th click.\n", + "```\n", "\n", "### AsyncIO\n", "\n", From 6bec173e442371192d463b6d6a3bc84591257d8a Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 18:52:34 +0100 Subject: [PATCH 18/26] Fix flake --- panel/widgets/button.py | 1 - 1 file changed, 1 deletion(-) diff --git a/panel/widgets/button.py b/panel/widgets/button.py index 62c8d46663..0cf0a89999 100644 --- a/panel/widgets/button.py +++ b/panel/widgets/button.py @@ -34,7 +34,6 @@ class _ClickButton(_ButtonBase): def _get_model(self, doc, root=None, parent=None, comm=None): model = super()._get_model(doc, root, parent, comm) - ref = (root or model).ref['id'] if comm: model.on_event(self._event, self._comm_event) else: From bd743445ed4e796800a16be3a21e0e5b5a81d0f3 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 6 Dec 2021 18:52:48 +0100 Subject: [PATCH 19/26] Add aiohttp to example envs --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b05ca887e4..8d417a0c8b 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,8 @@ def run(self): 'ipyvolume', 'ipyleaflet', 'xarray', - 'pyinstrument >=4.0' + 'pyinstrument >=4.0', + 'aiohttp' ], 'tests': _tests, 'recommended': _recommended, From b8afc2cd4a53c6aa6d61bcc7ed1c4ec4ed7e3e08 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 7 Dec 2021 11:14:21 +0100 Subject: [PATCH 20/26] Fix up Button tests --- panel/tests/widgets/test_button.py | 25 ++++++++++++++++++++++--- panel/widgets/button.py | 2 +- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/panel/tests/widgets/test_button.py b/panel/tests/widgets/test_button.py index 86309fb2be..d30c277ffb 100644 --- a/panel/tests/widgets/test_button.py +++ b/panel/tests/widgets/test_button.py @@ -1,4 +1,6 @@ -from panel.widgets import Button, Toggle +from bokeh.events import ButtonClick, MenuItemClick + +from panel.widgets import Button, MenuButton, Toggle def test_button(document, comm): @@ -9,7 +11,7 @@ def test_button(document, comm): assert isinstance(widget, button._widget_type) assert widget.label == 'Button' - button._process_events({'clicks': 1}) + button._process_event(None) assert button.clicks == 1 @@ -25,11 +27,28 @@ def callback(event): button.param.watch(callback, 'value') assert button.value == False - button._server_click(document, widget.ref['id'], None) + button._process_event(ButtonClick(widget)) assert events == [True] assert button.value == False +def test_menu_button(document, comm): + menu_items = [('Option A', 'a'), ('Option B', 'b'), ('Option C', 'c'), None, ('Help', 'help')] + menu_button = MenuButton(items=menu_items) + + widget = menu_button.get_root(document, comm=comm) + + events = [] + def callback(event): + events.append(event.new) + + menu_button.param.watch(callback, 'clicked') + + menu_button._process_event(MenuItemClick(widget, 'b')) + + assert events == ['b'] + + def test_button_jscallback_clicks(document, comm): button = Button(name='Button') code = 'console.log("Clicked!")' diff --git a/panel/widgets/button.py b/panel/widgets/button.py index 0cf0a89999..5b9681cadb 100644 --- a/panel/widgets/button.py +++ b/panel/widgets/button.py @@ -10,7 +10,7 @@ Button as _BkButton, Toggle as _BkToggle, Dropdown as _BkDropdown ) -from bokeh.events import (MenuItemClick, ButtonClick) +from bokeh.events import MenuItemClick, ButtonClick from .base import Widget From 89dd2f080968b4c072e33932fd01e6b18345e262 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 7 Dec 2021 11:59:49 +0100 Subject: [PATCH 21/26] Small fixes --- panel/io/server.py | 5 ++++- panel/io/state.py | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/panel/io/server.py b/panel/io/server.py index 73b3b0d399..80d9511be8 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -804,7 +804,10 @@ def run(self): bokeh_server = target(*args, **kwargs) finally: if isinstance(bokeh_server, Server): - bokeh_server.stop() + try: + bokeh_server.stop() + except Exception: + pass if hasattr(self, '_target'): del self._target, self._args, self._kwargs else: diff --git a/panel/io/state.py b/panel/io/state.py index caa30dee27..187c3a9946 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -122,7 +122,7 @@ def __repr__(self): @property def _thread_id(self): - return self._thread_id_[self.curdoc] + return self._thread_id_.get(self.curdoc) @_thread_id.setter def _thread_id(self, thread_id): @@ -261,6 +261,12 @@ def add_periodic_callback(self, callback, period=500, count=None, callback=callback, period=period, count=count, timeout=timeout ) if start: + if self._thread_id is not None: + thread = threading.current_thread() + thread_id = thread.ident if thread else Non + if self._thread_id != thread_id: + self.curdoc.add_next_tick_callback(cb.start) + return cb cb.start() return cb From cb28e7bfbd80a2a2d49a5510a3ac4efc5d87b99a Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 7 Dec 2021 12:00:22 +0100 Subject: [PATCH 22/26] Clean up server tests --- panel/tests/test_server.py | 71 +++++++++++--------------------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index 0d2fdc8214..c892ddc006 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -57,12 +57,9 @@ def test_server_static_dirs(): # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6000/tests/test_server.py") - with open(__file__, encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6000/tests/test_server.py") + with open(__file__, encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources(): @@ -73,12 +70,9 @@ def test_server_template_static_resources(): # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6001/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") - with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6001/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") + with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources_with_prefix(): @@ -89,12 +83,9 @@ def test_server_template_static_resources_with_prefix(): # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6004/prefix/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") - with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: - assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') - finally: - server.stop() + r = requests.get("http://localhost:6004/prefix/static/extensions/panel/bundled/bootstraptemplate/bootstrap.css") + with open(DIST_DIR / 'bundled' / 'bootstraptemplate' / 'bootstrap.css', encoding='utf-8') as f: + assert f.read() == r.content.decode('utf-8').replace('\r\n', '\n') def test_server_template_static_resources_with_prefix_relative_url(): @@ -105,12 +96,9 @@ def test_server_template_static_resources_with_prefix_relative_url(): # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6005/prefix/template") - content = r.content.decode('utf-8') - assert 'href="static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content - finally: - server.stop() + r = requests.get("http://localhost:6005/prefix/template") + content = r.content.decode('utf-8') + assert 'href="static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content def test_server_template_static_resources_with_subpath_and_prefix_relative_url(): @@ -121,12 +109,9 @@ def test_server_template_static_resources_with_subpath_and_prefix_relative_url() # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6005/prefix/subpath/template") - content = r.content.decode('utf-8') - assert 'href="../static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content - finally: - server.stop() + r = requests.get("http://localhost:6005/prefix/subpath/template") + content = r.content.decode('utf-8') + assert 'href="../static/extensions/panel/bundled/bootstraptemplate/bootstrap.css"' in content def test_server_extensions_on_root(): @@ -137,11 +122,8 @@ def test_server_extensions_on_root(): # Wait for server to start time.sleep(1) - try: - r = requests.get("http://localhost:6006/static/extensions/panel/css/loading.css") - assert r.ok - finally: - server.stop() + r = requests.get("http://localhost:6006/static/extensions/panel/css/loading.css") + assert r.ok def test_autoload_js(): @@ -156,11 +138,8 @@ def test_autoload_js(): args = f"bokeh-autoload-element=1002&bokeh-app-path=/{app_name}&bokeh-absolute-url=http://localhost:{port}/{app_name}" r = requests.get(f"http://localhost:{port}/{app_name}/autoload.js?{args}") - try: - assert r.status_code == 200 - assert f"http://localhost:{port}/static/extensions/panel/css/alerts.css" in r.content.decode('utf-8') - finally: - server.stop() + assert r.status_code == 200 + assert f"http://localhost:{port}/static/extensions/panel/css/alerts.css" in r.content.decode('utf-8') def test_server_async_callbacks(): @@ -193,10 +172,7 @@ async def cb(event, count=[0]): time.sleep(2) # Ensure multiple callbacks started concurrently - try: - assert max(counts) > 1 - finally: - server.stop() + assert max(counts) > 1 def test_serve_config_per_session_state(): @@ -376,17 +352,12 @@ def test_server_thread_pool_periodic(threads): counts = [] - def setup(event): - state.add_periodic_callback(cb, 100) - def cb(count=[0]): count[0] += 1 counts.append(count[0]) time.sleep(0.5) count[0] -= 1 - button.on_click(setup) - port = 6012 serve(button, port=port, threaded=True, show=False) @@ -397,7 +368,7 @@ def cb(count=[0]): doc = list(button._models.values())[0][0].document with set_curdoc(doc): - button.clicks += 1 + state.add_periodic_callback(cb, 100) # Wait for callbacks to be scheduled time.sleep(1) From 9d6c23d19f3fb149331f4d28a3d0b4d70977be74 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 7 Dec 2021 12:29:33 +0100 Subject: [PATCH 23/26] Fix flakes --- panel/tests/test_server.py | 40 +++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index c892ddc006..8c7ca0d423 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -52,7 +52,7 @@ def test_server_static_dirs(): html = Markdown('# Title') static = {'tests': os.path.dirname(__file__)} - server = serve(html, port=6000, threaded=True, static_dirs=static, show=False) + serve(html, port=6000, threaded=True, static_dirs=static, show=False) # Wait for server to start time.sleep(1) @@ -65,7 +65,7 @@ def test_server_static_dirs(): def test_server_template_static_resources(): template = BootstrapTemplate() - server = serve({'template': template}, port=6001, threaded=True, show=False) + serve({'template': template}, port=6001, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -78,7 +78,7 @@ def test_server_template_static_resources(): def test_server_template_static_resources_with_prefix(): template = BootstrapTemplate() - server = serve({'template': template}, port=6004, threaded=True, show=False, prefix='prefix') + serve({'template': template}, port=6004, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) @@ -91,7 +91,7 @@ def test_server_template_static_resources_with_prefix(): def test_server_template_static_resources_with_prefix_relative_url(): template = BootstrapTemplate() - server = serve({'template': template}, port=6005, threaded=True, show=False, prefix='prefix') + serve({'template': template}, port=6005, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) @@ -104,7 +104,7 @@ def test_server_template_static_resources_with_prefix_relative_url(): def test_server_template_static_resources_with_subpath_and_prefix_relative_url(): template = BootstrapTemplate() - server = serve({'/subpath/template': template}, port=6005, threaded=True, show=False, prefix='prefix') + serve({'/subpath/template': template}, port=6005, threaded=True, show=False, prefix='prefix') # Wait for server to start time.sleep(1) @@ -117,7 +117,7 @@ def test_server_template_static_resources_with_subpath_and_prefix_relative_url() def test_server_extensions_on_root(): html = Markdown('# Title') - server = serve(html, port=6006, threaded=True, show=False) + serve(html, port=6006, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -130,7 +130,7 @@ def test_autoload_js(): html = Markdown('# Title') port = 6007 app_name = 'test' - server = serve({app_name: html}, port=port, threaded=True, show=False) + serve({app_name: html}, port=port, threaded=True, show=False) # Wait for server to start time.sleep(0.5) @@ -156,7 +156,7 @@ async def cb(event, count=[0]): button.on_click(cb) - server = serve(button, port=6002, threaded=True, show=False) + serve(button, port=6002, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -183,29 +183,25 @@ def app1(): def app2(): config.raw_css = [CSS2] - server1 = serve(app1, port=6004, threaded=True, show=False) - server2 = serve(app2, port=6005, threaded=True, show=False) + serve(app1, port=6004, threaded=True, show=False) + serve(app2, port=6005, threaded=True, show=False) r1 = requests.get("http://localhost:6004/").content.decode('utf-8') r2 = requests.get("http://localhost:6005/").content.decode('utf-8') - try: - assert CSS1 not in config.raw_css - assert CSS2 not in config.raw_css - assert CSS1 in r1 - assert CSS2 not in r1 - assert CSS1 not in r2 - assert CSS2 in r2 - finally: - server1.stop() - server2.stop() + assert CSS1 not in config.raw_css + assert CSS2 not in config.raw_css + assert CSS1 in r1 + assert CSS2 not in r1 + assert CSS1 not in r2 + assert CSS2 in r2 def test_server_session_info(): with config.set(session_history=-1): html = Markdown('# Title') - server = serve(html, port=6003, threaded=True, show=False) + serve(html, port=6003, threaded=True, show=False) # Wait for server to start time.sleep(1) @@ -227,7 +223,6 @@ def test_server_session_info(): state._init_session(None) assert state.session_info['live'] == 1 - server.stop() state.curdoc = None html._server_destroy(session_context) assert state.session_info['live'] == 0 @@ -265,6 +260,7 @@ def test_serve_can_serve_panel_app_from_file(): server = get_server({"panel-app": path}) assert "/panel-app" in server._tornado.applications + def test_serve_can_serve_bokeh_app_from_file(): path = pathlib.Path(__file__).parent / "io"/"bk_app.py" server = get_server({"bk-app": path}) From 6fce70b35c9f5292a21e61aec4d5564bdf03b87f Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 7 Dec 2021 12:47:32 +0100 Subject: [PATCH 24/26] Fix flake --- panel/io/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/panel/io/state.py b/panel/io/state.py index 187c3a9946..c35d76e5d7 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -263,7 +263,7 @@ def add_periodic_callback(self, callback, period=500, count=None, if start: if self._thread_id is not None: thread = threading.current_thread() - thread_id = thread.ident if thread else Non + thread_id = thread.ident if thread else None if self._thread_id != thread_id: self.curdoc.add_next_tick_callback(cb.start) return cb From 63b60b0c4119192750b5815223e425f2c17fe121 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 4 Jan 2022 19:48:33 +0100 Subject: [PATCH 25/26] Various fixes for tests --- panel/io/state.py | 2 +- panel/tests/conftest.py | 33 +++++++++++++++++++----------- panel/tests/test_server.py | 6 +++--- panel/tests/widgets/test_select.py | 4 ++-- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/panel/io/state.py b/panel/io/state.py index c35d76e5d7..8873866945 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -122,7 +122,7 @@ def __repr__(self): @property def _thread_id(self): - return self._thread_id_.get(self.curdoc) + return self._thread_id_.get(self.curdoc) if self.curdoc else None @_thread_id.setter def _thread_id(self, thread_id): diff --git a/panel/tests/conftest.py b/panel/tests/conftest.py index e709fb3a07..9f4182c1aa 100644 --- a/panel/tests/conftest.py +++ b/panel/tests/conftest.py @@ -89,7 +89,7 @@ def tmpdir(request, tmpdir_factory): @pytest.fixture() def html_server_session(): html = HTML('

Title

') - server = serve(html, port=5006, show=False, start=False) + server = serve(html, port=6000, show=False, start=False) session = pull_session( session_id='Test', url="http://localhost:{:d}/".format(server.port), @@ -105,7 +105,7 @@ def html_server_session(): @pytest.fixture() def markdown_server_session(): html = Markdown('#Title') - server = serve(html, port=5007, show=False, start=False) + server = serve(html, port=6001, show=False, start=False) session = pull_session( session_id='Test', url="http://localhost:{:d}/".format(server.port), @@ -152,8 +152,10 @@ def create_sessions(slugs, titles): def with_curdoc(): old_curdoc = state.curdoc state.curdoc = Document() - yield - state.curdoc = old_curdoc + try: + yield + finally: + state.curdoc = old_curdoc @contextmanager @@ -172,21 +174,28 @@ def server_cleanup(): """ Clean up after test fails """ - yield - state.kill_all_servers() - state._indicators.clear() - state._locations.clear() + try: + yield + finally: + state.kill_all_servers() + state._indicators.clear() + state._locations.clear() + state._curdoc = None @pytest.fixture def py_file(): tf = tempfile.NamedTemporaryFile(mode='w', suffix='.py') - yield tf - tf.close() + try: + yield tf + finally: + tf.close() @pytest.fixture def threads(): config.nthreads = 4 - yield 4 - config.nthreads = None + try: + yield 4 + finally: + config.nthreads = None diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index 8c7ca0d423..a3803ecdf0 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -22,7 +22,7 @@ def test_get_server(html_server_session): html, server, session = html_server_session - assert server.port == 5006 + assert server.port == 6000 root = session.document.roots[0] assert isinstance(root, BkHTML) assert root.text == '<h1>Title</h1>' @@ -230,8 +230,8 @@ def test_server_session_info(): def test_show_server_info(html_server_session, markdown_server_session): server_info = repr(state) - assert "localhost:5006 - HTML" in server_info - assert "localhost:5007 - Markdown" in server_info + assert "localhost:6000 - HTML" in server_info + assert "localhost:6001 - Markdown" in server_info def test_kill_all_servers(html_server_session, markdown_server_session): diff --git a/panel/tests/widgets/test_select.py b/panel/tests/widgets/test_select.py index a64ce1e822..5a4ed05474 100644 --- a/panel/tests/widgets/test_select.py +++ b/panel/tests/widgets/test_select.py @@ -73,7 +73,7 @@ def test_select_groups_list_options(document, comm): select._process_events({'value': str(groups['a'][1])}) assert select.value == groups['a'][1] - widget.value = str(groups['a'][0]) + select._process_events({'value': str(groups['a'][0])}) assert select.value == groups['a'][0] select.value = groups['a'][1] @@ -94,7 +94,7 @@ def test_select_groups_dict_options(document, comm): select._process_events({'value': str(groups['B']['c'])}) assert select.value == groups['B']['c'] - widget.value = str(groups['A']['b']) + select._process_events({'value': str(groups['A']['b'])}) assert select.value == groups['A']['b'] select.value = groups['A']['a'] From 0b111ba9341030cc759af2e1830636d4860b6e0f Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 5 Jan 2022 13:19:30 +0100 Subject: [PATCH 26/26] Ensure thread pool is cleaned up --- panel/tests/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/panel/tests/conftest.py b/panel/tests/conftest.py index 9f4182c1aa..672e34d174 100644 --- a/panel/tests/conftest.py +++ b/panel/tests/conftest.py @@ -181,7 +181,9 @@ def server_cleanup(): state._indicators.clear() state._locations.clear() state._curdoc = None - + if state._thread_pool is not None: + state._thread_pool.shutdown(wait=False) + state._thread_pool = None @pytest.fixture def py_file():