Skip to content

Commit

Permalink
Add server video stream tutorial (#6727)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcSkovMadsen authored Jun 25, 2024
1 parent 5a60261 commit b3a2484
Show file tree
Hide file tree
Showing 3 changed files with 606 additions and 27 deletions.
281 changes: 255 additions & 26 deletions doc/how_to/concurrency/manual_threading.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,273 @@
# Set up Manual Threading
# Set Up Manual Threading

Enabling threading in Panel like we saw in the [automatic threading guide](threading) is a simple way to achieve concurrency, however sometimes we need more control. Below we will demonstrate an example of a `Thread` which we start in the background to process items we put in a queue for processing. We simulate the processing with a `time.sleep` but it could be any long-running computation. The `threading.Condition` allows us to manipulate the global shared `queue`.
Enabling threading in Panel, as demonstrated in the [automatic threading guide](threading.md), provides a simple method to achieve concurrency. However, there are situations where greater control is necessary.

Below, we will demonstrate how to safely implement threads either per session or globally across multiple sessions.

## Session Thread

This section illustrates how to use a `Thread` to process tasks within a queue, with one thread initiated per session to handle tasks individually per session.

We simulate task processing using `time.sleep`, but this could represent any long-running computation.

```python
import datetime
import threading
import time

from typing import Callable

import param

import panel as pn

pn.extension()

class SessionTaskRunner(pn.viewable.Viewer):
value = param.Parameter(
doc="The last result or exception", label="Last Result", constant=True
)

tasks: int = param.Integer(doc="Number of tasks in the queue", constant=True)
status: str = param.String(
default="The queue is empty", doc="Status message", constant=True
)

worker: Callable = param.Callable(
allow_None=False, doc="Function that processes the task"
)

def __init__(self, **params):
super().__init__(**params)
self._queue = []
self._stop_thread = False
self._event = threading.Event()
self._thread = threading.Thread(target=self._task_runner, daemon=True)
self._thread.start()
pn.state.on_session_destroyed(self._session_destroyed)

def _log(self, message):
print(f"{id(self)} - {message}")

def _task_runner(self):
while not self._stop_thread:
while self._queue:
with param.edit_constant(self):
self.status = f"Processing: {len(self._queue)} items left."
self._log(self.status)
task = self._queue.pop(0)
try:
result = self.worker(task)
with param.edit_constant(self):
self.value = result
except Exception as ex:
self.value = ex

with param.edit_constant(self):
self.tasks = len(self._queue)
self.status = self.param.status.default

self._event.clear()
if not self._queue and not self._stop_thread:
self._log("Waiting for a task")
self._event.wait()

self._log("Finished Task Runner")

def _stop_thread_func(self):
self._log(f"{id(self)} - Stopping Task Runner")
self._stop_thread = True
self._event.set()

def _session_destroyed(self, session_context):
self._stop_thread_func()

def __del__(self):
self._stop_thread_func()

def __panel__(self):
return pn.Column(
f"## TaskRunner {id(self)}",
pn.pane.Str(self.param.status),
pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)),
)

def append(self, task):
"""Appends a task to the queue"""
self._queue.append(task)
with param.edit_constant(self):
self.tasks = len(self._queue)
self._event.set()

```

We will now create a task runner and a callback that queues new tasks for processing when a button is clicked:

```python
def example_worker(task):
time.sleep(1)
return datetime.datetime.now()

task_runner = SessionTaskRunner(worker=example_worker)

def add_task(event):
task_runner.append("task")

button = pn.widgets.Button(name="Add Task", on_click=add_task, button_type="primary")

pn.Column(button, task_runner).servable()
```

Since processing occurs on a separate thread, the application remains responsive to further user interactions, such as queuing new tasks.

:::{note}
To use threading efficiently:

- We terminate the thread upon session destruction to prevent it from consuming resources indefinitely.
- We use daemon threads (`daemon=True`) to allow the server to be stopped using CTRL+C.
- We employ the `Event.wait` method for efficient task-waiting, which is more resource-efficient compared to repeatedly sleeping and checking for new tasks using `time.sleep`.

:::

## Global Thread

When we need to share data periodically across all sessions, it is often inefficient to fetch and process this data separately for each session.

Instead, we can utilize a single thread. When initiating global threads, it's crucial to avoid starting them multiple times, especially in sessions or modules subject to `--autoreload`. To circumvent this issue, we can globally share a worker or thread through the Panel cache (`pn.state.cache`).

Let's create a `GlobalTaskRunner` that accepts a function (`worker`) and executes it repeatedly, pausing for `sleep` seconds between each execution.

This worker can be used to ingest data from a database, the web, or any server resource.

```python
import datetime
import threading
import time

from typing import Callable

import param

import panel as pn

c = threading.Condition()
pn.extension()

button = pn.widgets.Button(name='Click to launch')
text = pn.widgets.StaticText()
class GlobalTaskRunner(pn.viewable.Viewer):
"""The GlobalTaskRunner creates a singleton instance for each key."""
value = param.Parameter(doc="The most recent result", label="Last Result", constant=True)
exception: Exception = param.ClassSelector(
class_=Exception,
allow_None=True,
doc="The most recent exception, if any",
label="Last Exception",
constant=True,
)
worker: Callable = param.Callable(
allow_None=False, doc="Function that generates a result"
)
seconds: float = param.Number(
default=1.0, doc="Interval between worker calls", bounds=(0.001, None)
)
key: str = param.String(allow_None=False, constant=True)

queue = []
_global_task_runner_key = "__global_task_runners__"

def callback():
global queue
while True:
c.acquire()
for i, event in enumerate(queue):
text.value = f'Processing item {i+1} of {len(queue)} items in queue.'
... # Do something with the event
time.sleep(2)
queue.clear()
text.value = "Queue empty"
c.release()
time.sleep(1)
def __init__(self, key: str, **params):
super().__init__(key=key, **params)

thread = threading.Thread(target=callback)
thread.start()
self._stop_thread = False
self._thread = threading.Thread(target=self._task_runner, daemon=True)
self._thread.start()
self._log("Created")

def __new__(cls, key, **kwargs):
task_runners = pn.state.cache[cls._global_task_runner_key] = pn.state.cache.get(
cls._global_task_runner_key, {}
)
task_runner = task_runners.get(key, None)

if not task_runner:
task_runner = super(GlobalTaskRunner, cls).__new__(cls)
task_runners[key] = task_runner

return task_runner

def _log(self, message):
print(f"{id(self)} - {message}")

def _task_runner(self):
while not self._stop_thread:
try:
result = self.worker()
with param.edit_constant(self):
self.value = result
self.exception = None
except Exception as ex:
with param.edit_constant(self):
self.exception = ex
if not self._stop_thread:
self._log("Sleeping")
time.sleep(self.seconds)

self._log("Task Runner Finished")

def remove(self):
"""Securely stops and removes the GlobalThreadWorker."""
self._log("Removing")
self._stop_thread = True
self._thread.join()

cache = pn.state.cache.get(self._global_task_runner_key, {})
if self.key in cache:
del cache[self.key]
self._log("Removed")

@classmethod
def remove_all(cls):
"""Securely stops and removes all GlobalThreadWorkers."""
for gtw in list(pn.state.cache.get(cls._global_task_runner_key, {}).values()):
gtw.remove()
pn.state.cache[cls._global_task_runner_key] = {}

def __panel__(self):
return pn.Column(
f"## TaskRunner {id(self)}",
self.param.seconds,
pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)),
pn.pane.Str(
pn.rx("Last Exception: {value}").format(value=self.param.exception)
),
)
```

Now we will create a callback that puts new items for processing on the queue when a button is clicked:
Let's test this with a simple example worker that generates timestamps every 0.33 seconds.

```python
def on_click(event):
queue.append(event)
def example_worker():
time.sleep(1)
return datetime.datetime.now()

task_runner = GlobalTaskRunner(
key="example-worker", worker=example_worker, seconds=0.33
)

results = []

button.on_click(on_click)
@pn.depends(task_runner.param.value)
def result_view(value):
results.append(value)
return f"{len(results)} results produced during this session"

pn.Row(button, text).servable()
pn.Column(
task_runner, result_view,
).servable()
```

Since the processing happens on a separate thread the application itself can still remain responsive to further user input (such as putting new items on the queue).
:::{note}

For efficient use of global threading:

- We employ the *singleton* principle (`__new__`) to create only one instance and thread per key.
- We use daemon threads (`daemon=True`) to ensure the server can be halted using CTRL+C.

:::
Loading

0 comments on commit b3a2484

Please sign in to comment.