From e7491b5c8d38df9036fb393c24935ca882a0319d Mon Sep 17 00:00:00 2001 From: Marc Skov Madsen Date: Tue, 25 Jun 2024 08:22:01 +0200 Subject: [PATCH] Add server video stream tutorial (#6727) --- doc/how_to/concurrency/manual_threading.md | 281 ++++++++++++-- .../intermediate/build_server_video_stream.md | 349 ++++++++++++++++++ doc/tutorials/intermediate/index.md | 3 +- 3 files changed, 606 insertions(+), 27 deletions(-) create mode 100644 doc/tutorials/intermediate/build_server_video_stream.md diff --git a/doc/how_to/concurrency/manual_threading.md b/doc/how_to/concurrency/manual_threading.md index 4661fea965..25c85d5804 100644 --- a/doc/how_to/concurrency/manual_threading.md +++ b/doc/how_to/concurrency/manual_threading.md @@ -1,44 +1,273 @@ -# Set up Manual Threading +# Set Up Manual Threading -Enabling threading in Panel like we saw in the [automatic threading guide](threading) is a simple way to achieve concurrency, however sometimes we need more control. Below we will demonstrate an example of a `Thread` which we start in the background to process items we put in a queue for processing. We simulate the processing with a `time.sleep` but it could be any long-running computation. The `threading.Condition` allows us to manipulate the global shared `queue`. +Enabling threading in Panel, as demonstrated in the [automatic threading guide](threading.md), provides a simple method to achieve concurrency. However, there are situations where greater control is necessary. + +Below, we will demonstrate how to safely implement threads either per session or globally across multiple sessions. + +## Session Thread + +This section illustrates how to use a `Thread` to process tasks within a queue, with one thread initiated per session to handle tasks individually per session. + +We simulate task processing using `time.sleep`, but this could represent any long-running computation. ```python +import datetime +import threading import time + +from typing import Callable + +import param + +import panel as pn + +pn.extension() + +class SessionTaskRunner(pn.viewable.Viewer): + value = param.Parameter( + doc="The last result or exception", label="Last Result", constant=True + ) + + tasks: int = param.Integer(doc="Number of tasks in the queue", constant=True) + status: str = param.String( + default="The queue is empty", doc="Status message", constant=True + ) + + worker: Callable = param.Callable( + allow_None=False, doc="Function that processes the task" + ) + + def __init__(self, **params): + super().__init__(**params) + self._queue = [] + self._stop_thread = False + self._event = threading.Event() + self._thread = threading.Thread(target=self._task_runner, daemon=True) + self._thread.start() + pn.state.on_session_destroyed(self._session_destroyed) + + def _log(self, message): + print(f"{id(self)} - {message}") + + def _task_runner(self): + while not self._stop_thread: + while self._queue: + with param.edit_constant(self): + self.status = f"Processing: {len(self._queue)} items left." + self._log(self.status) + task = self._queue.pop(0) + try: + result = self.worker(task) + with param.edit_constant(self): + self.value = result + except Exception as ex: + self.value = ex + + with param.edit_constant(self): + self.tasks = len(self._queue) + self.status = self.param.status.default + + self._event.clear() + if not self._queue and not self._stop_thread: + self._log("Waiting for a task") + self._event.wait() + + self._log("Finished Task Runner") + + def _stop_thread_func(self): + self._log(f"{id(self)} - Stopping Task Runner") + self._stop_thread = True + self._event.set() + + def _session_destroyed(self, session_context): + self._stop_thread_func() + + def __del__(self): + self._stop_thread_func() + + def __panel__(self): + return pn.Column( + f"## TaskRunner {id(self)}", + pn.pane.Str(self.param.status), + pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)), + ) + + def append(self, task): + """Appends a task to the queue""" + self._queue.append(task) + with param.edit_constant(self): + self.tasks = len(self._queue) + self._event.set() + +``` + +We will now create a task runner and a callback that queues new tasks for processing when a button is clicked: + +```python +def example_worker(task): + time.sleep(1) + return datetime.datetime.now() + +task_runner = SessionTaskRunner(worker=example_worker) + +def add_task(event): + task_runner.append("task") + +button = pn.widgets.Button(name="Add Task", on_click=add_task, button_type="primary") + +pn.Column(button, task_runner).servable() +``` + +Since processing occurs on a separate thread, the application remains responsive to further user interactions, such as queuing new tasks. + +:::{note} +To use threading efficiently: + +- We terminate the thread upon session destruction to prevent it from consuming resources indefinitely. +- We use daemon threads (`daemon=True`) to allow the server to be stopped using CTRL+C. +- We employ the `Event.wait` method for efficient task-waiting, which is more resource-efficient compared to repeatedly sleeping and checking for new tasks using `time.sleep`. + +::: + +## Global Thread + +When we need to share data periodically across all sessions, it is often inefficient to fetch and process this data separately for each session. + +Instead, we can utilize a single thread. When initiating global threads, it's crucial to avoid starting them multiple times, especially in sessions or modules subject to `--autoreload`. To circumvent this issue, we can globally share a worker or thread through the Panel cache (`pn.state.cache`). + +Let's create a `GlobalTaskRunner` that accepts a function (`worker`) and executes it repeatedly, pausing for `sleep` seconds between each execution. + +This worker can be used to ingest data from a database, the web, or any server resource. + +```python +import datetime import threading +import time + +from typing import Callable + +import param + +import panel as pn -c = threading.Condition() +pn.extension() -button = pn.widgets.Button(name='Click to launch') -text = pn.widgets.StaticText() +class GlobalTaskRunner(pn.viewable.Viewer): + """The GlobalTaskRunner creates a singleton instance for each key.""" + value = param.Parameter(doc="The most recent result", label="Last Result", constant=True) + exception: Exception = param.ClassSelector( + class_=Exception, + allow_None=True, + doc="The most recent exception, if any", + label="Last Exception", + constant=True, + ) + worker: Callable = param.Callable( + allow_None=False, doc="Function that generates a result" + ) + seconds: float = param.Number( + default=1.0, doc="Interval between worker calls", bounds=(0.001, None) + ) + key: str = param.String(allow_None=False, constant=True) -queue = [] + _global_task_runner_key = "__global_task_runners__" -def callback(): - global queue - while True: - c.acquire() - for i, event in enumerate(queue): - text.value = f'Processing item {i+1} of {len(queue)} items in queue.' - ... # Do something with the event - time.sleep(2) - queue.clear() - text.value = "Queue empty" - c.release() - time.sleep(1) + def __init__(self, key: str, **params): + super().__init__(key=key, **params) -thread = threading.Thread(target=callback) -thread.start() + self._stop_thread = False + self._thread = threading.Thread(target=self._task_runner, daemon=True) + self._thread.start() + self._log("Created") + + def __new__(cls, key, **kwargs): + task_runners = pn.state.cache[cls._global_task_runner_key] = pn.state.cache.get( + cls._global_task_runner_key, {} + ) + task_runner = task_runners.get(key, None) + + if not task_runner: + task_runner = super(GlobalTaskRunner, cls).__new__(cls) + task_runners[key] = task_runner + + return task_runner + + def _log(self, message): + print(f"{id(self)} - {message}") + + def _task_runner(self): + while not self._stop_thread: + try: + result = self.worker() + with param.edit_constant(self): + self.value = result + self.exception = None + except Exception as ex: + with param.edit_constant(self): + self.exception = ex + if not self._stop_thread: + self._log("Sleeping") + time.sleep(self.seconds) + + self._log("Task Runner Finished") + + def remove(self): + """Securely stops and removes the GlobalThreadWorker.""" + self._log("Removing") + self._stop_thread = True + self._thread.join() + + cache = pn.state.cache.get(self._global_task_runner_key, {}) + if self.key in cache: + del cache[self.key] + self._log("Removed") + + @classmethod + def remove_all(cls): + """Securely stops and removes all GlobalThreadWorkers.""" + for gtw in list(pn.state.cache.get(cls._global_task_runner_key, {}).values()): + gtw.remove() + pn.state.cache[cls._global_task_runner_key] = {} + + def __panel__(self): + return pn.Column( + f"## TaskRunner {id(self)}", + self.param.seconds, + pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)), + pn.pane.Str( + pn.rx("Last Exception: {value}").format(value=self.param.exception) + ), + ) ``` -Now we will create a callback that puts new items for processing on the queue when a button is clicked: +Let's test this with a simple example worker that generates timestamps every 0.33 seconds. ```python -def on_click(event): - queue.append(event) +def example_worker(): + time.sleep(1) + return datetime.datetime.now() + +task_runner = GlobalTaskRunner( + key="example-worker", worker=example_worker, seconds=0.33 +) + +results = [] -button.on_click(on_click) +@pn.depends(task_runner.param.value) +def result_view(value): + results.append(value) + return f"{len(results)} results produced during this session" -pn.Row(button, text).servable() +pn.Column( + task_runner, result_view, +).servable() ``` -Since the processing happens on a separate thread the application itself can still remain responsive to further user input (such as putting new items on the queue). +:::{note} + +For efficient use of global threading: + +- We employ the *singleton* principle (`__new__`) to create only one instance and thread per key. +- We use daemon threads (`daemon=True`) to ensure the server can be halted using CTRL+C. + +::: diff --git a/doc/tutorials/intermediate/build_server_video_stream.md b/doc/tutorials/intermediate/build_server_video_stream.md new file mode 100644 index 0000000000..16037aacc1 --- /dev/null +++ b/doc/tutorials/intermediate/build_server_video_stream.md @@ -0,0 +1,349 @@ +# Build a Server Side Video Camera Application + +Welcome to our tutorial on building a **server-side video camera application** using HoloViz Panel! In this fun and engaging guide, we'll walk you through the process of setting up a video stream from a camera connected to a server, not the user's machine. This approach uses Python's threading to handle real-time video processing without freezing the user interface. + +Let's dive into the code and see how it all comes together. + +:::{drowdown} Code + +`server_video_stream.py` + +```python +import threading +import time + +import cv2 as cv +import param + +from PIL import Image + +import panel as pn + +class CannotOpenCamera(Exception): + """Exception raised if the camera cannot be opened.""" + +class CannotReadCamera(Exception): + """Exception raised if the camera cannot be read.""" + + +class ServerVideoStream(pn.viewable.Viewer): + value = param.Parameter(doc="The current snapshot as a Pillow Image") + paused = param.Boolean(default=False, doc="Whether the video stream is paused") + fps = param.Number(10, doc="Frames per second", inclusive_bounds=(0, None)) + camera_index = param.Integer(0, doc="The index of the active camera") + + def __init__(self, **params): + super().__init__(**params) + + self._cameras = {} + + self._stop_thread = False + self._thread = threading.Thread(target=self._take_images) + self._thread.daemon = True + + def start(self, camera_indices=None): + if camera_indices: + for index in camera_indices: + self.get_camera(index) + + if not self._thread.is_alive(): + self._thread.start() + + def get_camera(self, index): + if index in self._cameras: + return self._cameras[index] + + cap = cv.VideoCapture(index) + + if not cap.isOpened(): + raise CannotOpenCamera(f"Cannot open the camera {index}") + + self._cameras[index] = cap + return cap + + @staticmethod + def _cv2_to_pil(bgr_image): + rgb_image = cv.cvtColor(bgr_image, cv.COLOR_BGR2RGB) + image = Image.fromarray(rgb_image) + return image + + def _take_image(self): + camera = self.get_camera(self.camera_index) + ret, frame = camera.read() + if not ret: + raise CannotReadCamera("Ensure the camera exists and is not in use by other processes.") + else: + self.value = self._cv2_to_pil(frame) + + def _take_images(self): + while not self._stop_thread: + start_time = time.time() + if not self.paused: + try: + self._take_image() + except Exception as ex: + print("Error: Could not capture image.") + print(ex) + + if self.fps > 0: + interval = 1 / self.fps + elapsed_time = time.time() - start_time + sleep_time = max(0, interval - elapsed_time) + time.sleep(sleep_time) + + def __del__(self): + self._stop_thread = True + if self._thread.is_alive(): + self._thread.join() + for camera in self._cameras.values(): + camera.release() + cv.destroyAllWindows() + + def __panel__(self): + settings = pn.Column( + self.param.paused, + self.param.fps, + self.param.camera_index, + width=300, + ) + image = pn.pane.Image(self.param.value, sizing_mode="stretch_both") + return pn.Row(settings, image) + + +server_video_stream = ServerVideoStream() +server_video_stream.start() +``` + +`app.py` + +```python +import panel as pn +from server_video_stream import server_video_stream + +pn.extension() + +server_video_stream.servable() +``` + +::: + +## Install the Dependencies + +To run the application, you'll need several packages: + +- **OpenCV** (`opencv`): A library for computer vision tasks, here used to interface with the camera. +- **Panel** (`panel`): A high-level app and dashboarding solution for Python, used to create the web interface. +- **Pillow** (`pillow`): An imaging library, used here to convert images from OpenCV format to a format suitable for web display. + +You can install these using conda or pip: + +::::{tab-set} + +:::{tab-item} conda +:sync: conda + +``` bash +conda install -y -c conda-forge opencv panel pillow +``` + +::: + +:::{tab-item} pip +:sync: pip + +``` bash +pip install opencv panel pillow +``` + +::: + +:::: + +## Build the App + +### File Breakdown + +This project consists of two Python files: + +- `server_video_stream.py` - Contains the reusable `ServerVideoStream` component. +- `app.py` - A simple script that utilizes the `server_video_stream` component. + +### Breakdown of `server_video_stream.py` + +#### Importing Libraries and Handling Exceptions + +```python +import threading +import time + +import cv2 as cv +import param + +from PIL import Image + +import panel as pn + +class CannotOpenCamera(Exception): + """Exception raised if the camera cannot be opened.""" + +class CannotReadCamera(Exception): + """Exception raised if the camera cannot be read.""" +``` + +We begin by importing the necessary libraries: + +- **threading**: For running background tasks that do not block the main program. +- **time**: To manage frame rates and delays. +- **cv2**: The [OpenCV](https://docs.opencv.org/4.x/) library for capturing and processing video frames. +- **param**: A component of the HoloViz ecosystem for declaring parameters. +- **Image from PIL**: To convert images from OpenCV's format to a web-friendly format. +- **panel**: The Panel library for creating web interfaces. + +We also define several custom exceptions to manage specific errors related to camera operations. + +#### Defining the `ServerVideoStream` Class + +```python +class ServerVideoStream(pn.viewable.Viewer): + value = param.Parameter(doc="The current snapshot as a Pillow Image") + paused = param.Boolean(default=False, doc="Whether the video stream is paused") + fps = param.Number(10, doc="Frames per second", inclusive_bounds=(0, None)) + camera_index = param.Integer(0, doc="The index of the active camera") +``` + +The `ServerVideoStream` class extends `pn.viewable.Viewer`, enabling its display in a Panel app. It includes parameters to control the stream: + +- **value**: Holds the current video frame. +- **paused**: A toggle to pause or resume the video capture. +- **fps**: Determines the frame rate of the video stream. +- **camera_index**: Specifies which camera to use if multiple are available. + +#### Initializing and Managing Cameras + +```python + def __init__(self, **params): + super().__init__(**params) + + self._cameras = {} + + self._stop_thread = False + self._thread = threading.Thread(target=self._take_images) + self._thread.daemon = True + + def start(self, camera_indices): + if camera_indices: + for index in camera_indices: + self.get_camera(index) + + if not self._thread.is_alive(): + self._thread.start() +``` + +The constructor initializes a thread for capturing images and managing the stream. The `start` method activates the cameras and starts the thread if it isn't already running. + +#### Capturing and Displaying Images + +```python + def get_camera(self, index): + if index in self._cameras: + return self._cameras[index] + + cap = cv.VideoCapture(index) + if not cap.isOpened(): + raise CannotOpenCamera(f"Cannot open the camera {index}") + + self._cameras[index] = cap + return cap + + @staticmethod + def _cv2_to_pil(bgr_image): + rgb_image = cv.cvtColor(bgr_image, cv.COLOR_BGR2RGB) + image = Image.fromarray(rgb_image) + return image + + def _take_image(self): + camera = self.get_camera(self.camera_index) + ret, frame = camera.read() + if not ret: + raise CannotReadCamera("Ensure the camera exists and is not in use by other processes.") + else: + self.value = self._cv2_to_pil(frame) + + def _take_images(self): + while not self._stop_thread: + start_time = time.time() + if not self.paused: + try: + self._take_image() + except Exception as ex: + print("Error: Could not capture image.") + print(ex) + + if self.fps > 0: + interval = 1 / self.fps + elapsed_time = time.time() - start_time + sleep_time = max(0, interval - elapsed_time) + time.sleep(sleep_time) + + def __del__(self): + self._stop_thread = True + if self._thread.is_alive(): + self._thread.join() + for camera in self._cameras.values(): + camera.release() + cv.destroyAllWindows() +``` + +The `_take_images` method runs in a loop within a separate thread, capturing images at the specified fps unless paused. This setup ensures the app remains responsive by not blocking the main thread. + +#### Display Setup + +```python + def __panel__(self): + settings = pn.Column( + self.param.paused, + self.param.fps, + self.param.camera_index, + width=300, + ) + image = pn.pane.Image(self.param.value, sizing_mode="stretch_both") + return pn.Row(settings, image) +``` + +The `__panel__` method defines how the class is rendered in a web page. It creates a user interface with controls for the camera settings and displays the current video frame. + +#### Sharing the Video Stream + +```python +server_video_stream = ServerVideoStream() +server_video_stream.start() +``` + +To share the video camera between all user sessions, we utilize a single instance. + +The `server_video_stream` is now ready for use in either a single or multi-page application. You can bind to its value or include the component in a layout. + +### `app.py` - Making It Servable + +```python +import panel as pn +from server_video_stream import server_video_stream + +pn.extension() + +server_video_stream.servable() +``` + +This script initializes the Panel extension and makes the `server_video_stream` instance available as a web app. + +Try serving the app with + +```bash +panel serve app.py +``` + +## References + +### How-To Guides + +- [Manual Threading](../../how_to/concurrency/manual_threading.md) diff --git a/doc/tutorials/intermediate/index.md b/doc/tutorials/intermediate/index.md index 1158719d7a..91ee340fcb 100644 --- a/doc/tutorials/intermediate/index.md +++ b/doc/tutorials/intermediate/index.md @@ -70,7 +70,7 @@ Now that you've mastered the more advanced concepts of Panel, it's time to put y - **[Create a Todo App](build_todo.md):** Create a Todo App using a class based approach. - **[Test a Todo App](test_todo.md):** Learn how to test a class based Panel app. - **Serve Apps without a Server:** Explore the realm of WASM to serve your apps without traditional servers. -- **Build a Streaming Dashboard:** Engineer a high-performing streaming dashboard employing a *producer/consumer* architecture. +- **[Build a Server Video Stream](build_server_video_stream.md):** Utilize threading to set up a video stream from a camera connected to a server without blocking the UI. ```{toctree} :titlesonly: @@ -86,4 +86,5 @@ serve advanced_layouts build_todo test_todo +build_server_video_stream ```