Skip to content

Commit

Permalink
feat: watch mode for run command
Browse files Browse the repository at this point in the history
Closes #101
  • Loading branch information
tumidi committed Aug 13, 2024
1 parent bd88ac4 commit dfe7517
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 55 deletions.
35 changes: 32 additions & 3 deletions questionpy_sdk/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,36 @@
# The QuestionPy SDK is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>

import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

import click

from questionpy_sdk.commands._helper import get_package_location
from questionpy_sdk.watcher import Watcher
from questionpy_sdk.webserver.app import WebServer
from questionpy_server.worker.runtime.package_location import DirPackageLocation

if TYPE_CHECKING:
from collections.abc import Coroutine


async def run_watcher(pkg_path: Path, pkg_location: DirPackageLocation, host: str, port: int) -> None:
async with Watcher(pkg_path, pkg_location, host, port) as watcher:
await watcher.run_forever()


@click.command()
@click.argument("package")
def run(package: str) -> None:
@click.option(
"--host", "-h", "host", default="localhost", show_default=True, type=click.STRING, help="Host to listen on."
)
@click.option(
"--port", "-p", "port", default=8080, show_default=True, type=click.IntRange(1024, 65535), help="Port to bind to."
)
@click.option("--watch", "-w", "watch", is_flag=True, help="Watch source directory and rebuild on changes.")
def run(package: str, host: str, port: int, *, watch: bool) -> None:
"""Run a package.
\b
Expand All @@ -22,5 +41,15 @@ def run(package: str) -> None:
- a source directory (built on-the-fly).
""" # noqa: D301
pkg_path = Path(package).resolve()
web_server = WebServer(get_package_location(package, pkg_path))
web_server.start_server()
pkg_location = get_package_location(package, pkg_path)
coro: Coroutine

if watch:
if not isinstance(pkg_location, DirPackageLocation) or pkg_path == pkg_location.path:
msg = "The --watch option only works with source directories."
raise click.BadParameter(msg)
coro = run_watcher(pkg_path, pkg_location, host=host, port=port)
else:
coro = WebServer(pkg_location, host=host, port=port).run_forever()

asyncio.run(coro)
2 changes: 1 addition & 1 deletion questionpy_sdk/package/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from questionpy_sdk.package.errors import PackageBuildError
from questionpy_sdk.package.source import PackageSource

log = logging.getLogger(__name__)
log = logging.getLogger("questionpy-sdk:builder")


class PackageBuilderBase(AbstractContextManager):
Expand Down
174 changes: 174 additions & 0 deletions questionpy_sdk/watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# This file is part of the QuestionPy SDK. (https://questionpy.org)
# The QuestionPy SDK is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>

import asyncio
import logging
from collections.abc import Awaitable, Callable
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING, Self

from watchdog.events import (
FileClosedEvent,
FileOpenedEvent,
FileSystemEvent,
FileSystemEventHandler,
FileSystemMovedEvent,
)
from watchdog.observers import Observer
from watchdog.utils.event_debouncer import EventDebouncer

from questionpy_common.constants import DIST_DIR
from questionpy_sdk.package.builder import DirPackageBuilder
from questionpy_sdk.package.errors import PackageBuildError, PackageSourceValidationError
from questionpy_sdk.package.source import PackageSource
from questionpy_sdk.webserver.app import WebServer
from questionpy_server.worker.runtime.package_location import DirPackageLocation

if TYPE_CHECKING:
from watchdog.observers.api import ObservedWatch

log = logging.getLogger("questionpy-sdk:watcher")

_DEBOUNCE_INTERVAL = 0.5 # seconds


class _EventHandler(FileSystemEventHandler):
"""Debounces events for watchdog file monitoring, ignoring events in the `dist` directory."""

def __init__(
self, loop: asyncio.AbstractEventLoop, notify_callback: Callable[[], Awaitable[None]], watch_path: Path
) -> None:
self._loop = loop
self._notify_callback = notify_callback
self._watch_path = watch_path

self._event_debouncer = EventDebouncer(_DEBOUNCE_INTERVAL, self._on_file_changes)

def start(self) -> None:
self._event_debouncer.start()

def stop(self) -> None:
if self._event_debouncer.is_alive():
self._event_debouncer.stop()
self._event_debouncer.join()

def dispatch(self, event: FileSystemEvent) -> None:
# filter events and debounce
if not self._ignore_event(event):
self._event_debouncer.handle_event(event)

def _on_file_changes(self, events: list[FileSystemEvent]) -> None:
# skip synchronization hassle by delegating this to the event loop in the main thread
asyncio.run_coroutine_threadsafe(self._notify_callback(), self._loop)

def _ignore_event(self, event: FileSystemEvent) -> bool:
"""Ignores events that should not trigger a rebuild.
Args:
event: The event to check.
Returns:
`True` if event should be ignored, otherwise `False`.
"""
if isinstance(event, FileOpenedEvent | FileClosedEvent):
return True

# ignore events events in `dist` dir
relevant_path = event.dest_path if isinstance(event, FileSystemMovedEvent) else event.src_path
try:
return Path(relevant_path).relative_to(self._watch_path).parts[0] == DIST_DIR
except IndexError:
return False


class Watcher(AbstractAsyncContextManager):
"""Watch a package source path and rebuild package/restart server on file changes."""

def __init__(self, source_path: Path, pkg_location: DirPackageLocation, host: str, port: int) -> None:
self._source_path = source_path
self._pkg_location = pkg_location
self._host = host
self._port = port

self._event_handler = _EventHandler(asyncio.get_running_loop(), self._notify, self._source_path)
self._observer = Observer()
self._webserver = WebServer(self._pkg_location, host=self._host, port=self._port)
self._on_change_event = asyncio.Event()
self._watch: ObservedWatch | None = None

async def __aenter__(self) -> Self:
self._event_handler.start()
self._observer.start()
log.info("Watching '%s' for changes...", self._source_path)

return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None
) -> None:
if self._observer.is_alive():
self._observer.stop()
self._event_handler.stop()
await self._webserver.stop_server()

def _schedule(self) -> None:
if self._watch is None:
log.debug("Starting file watching...")
self._watch = self._observer.schedule(self._event_handler, self._source_path, recursive=True)

def _unschedule(self) -> None:
if self._watch:
log.debug("Stopping file watching...")
self._observer.unschedule(self._watch)
self._watch = None

async def _notify(self) -> None:
self._on_change_event.set()

async def run_forever(self) -> None:
try:
await self._webserver.start_server()
except Exception:
log.exception("Failed to start webserver. The exception was:")
# When user messed up the their package on initial run, we just bail out.
return

self._schedule()

while True:
await self._on_change_event.wait()

# Try to rebuild package and restart web server which might fail.
self._unschedule()
await self._rebuild_and_restart()
self._schedule()

self._on_change_event.clear()

async def _rebuild_and_restart(self) -> None:
log.info("File changes detected. Rebuilding package...")

# Stop webserver.
try:
await self._webserver.stop_server()
except Exception:
log.exception("Failed to stop web server. The exception was:")
raise # Should not happen, thus we're propagating.

# Build package.
try:
package_source = PackageSource(self._source_path)
with DirPackageBuilder(package_source) as builder:
builder.write_package()
except (PackageBuildError, PackageSourceValidationError):
log.exception("Failed to build package. The exception was:")
return

# Start server.
try:
await self._webserver.start_server()
except Exception:
log.exception("Failed to start web server. The exception was:")
73 changes: 54 additions & 19 deletions questionpy_sdk/webserver/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# This file is part of the QuestionPy SDK. (https://questionpy.org)
# The QuestionPy SDK is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>
import asyncio
import logging
import traceback
from enum import StrEnum
from functools import cached_property
Expand All @@ -22,6 +24,8 @@
if TYPE_CHECKING:
from questionpy_server.worker.worker import Worker

log = logging.getLogger("questionpy-sdk:web-server")


async def _extract_manifest(app: web.Application) -> None:
webserver = app[SDK_WEBSERVER_APP_KEY]
Expand Down Expand Up @@ -56,29 +60,37 @@ def __init__(
self,
package_location: PackageLocation,
state_storage_path: Path = Path(__file__).parent / "question_state_storage",
host: str = "localhost",
port: int = 8080,
) -> None:
# We import here, so we don't have to work around circular imports.
from questionpy_sdk.webserver.routes.attempt import routes as attempt_routes # noqa: PLC0415
from questionpy_sdk.webserver.routes.options import routes as options_routes # noqa: PLC0415
from questionpy_sdk.webserver.routes.worker import routes as worker_routes # noqa: PLC0415

self.package_location = package_location
self._state_storage_root = state_storage_path
self._host = host
self._port = port

self.web_app = web.Application()
self.web_app[SDK_WEBSERVER_APP_KEY] = self
self._web_app: web.Application | None = None
self._runner: web.AppRunner | None = None
self.worker_pool: WorkerPool = WorkerPool(1, 500 * MiB, worker_type=ThreadWorker)

self.web_app.add_routes(attempt_routes)
self.web_app.add_routes(options_routes)
self.web_app.add_routes(worker_routes)
self.web_app.router.add_static("/static", Path(__file__).parent / "static")
async def start_server(self) -> None:
if self._web_app:
msg = "Web app is already running"
raise RuntimeError(msg)

self.web_app.on_startup.append(_extract_manifest)
self.web_app.middlewares.append(_invalid_question_state_middleware)
self._web_app = self._create_webapp()
self._runner = web.AppRunner(self._web_app)
await self._runner.setup()
await web.TCPSite(self._runner, self._host, self._port).start()

jinja2_extensions = ["jinja2.ext.do"]
aiohttp_jinja2.setup(self.web_app, loader=PackageLoader(__package__), extensions=jinja2_extensions)
self.worker_pool: WorkerPool = WorkerPool(1, 500 * MiB, worker_type=ThreadWorker)
async def stop_server(self) -> None:
if self._runner:
await self._runner.cleanup()
self._web_app = None
self._runner = None

async def run_forever(self) -> None:
await self.start_server()
await asyncio.Event().wait() # run forever

def read_state_file(self, filename: StateFilename) -> str | None:
try:
Expand All @@ -97,12 +109,35 @@ def delete_state_files(self, filename_1: StateFilename, *filenames: StateFilenam
# Remove package state dir if it's now empty.
self._package_state_dir.rmdir()

def start_server(self) -> None:
web.run_app(self.web_app)
def _create_webapp(self) -> web.Application:
# We import here, so we don't have to work around circular imports.
from questionpy_sdk.webserver.routes.attempt import routes as attempt_routes # noqa: PLC0415
from questionpy_sdk.webserver.routes.options import routes as options_routes # noqa: PLC0415
from questionpy_sdk.webserver.routes.worker import routes as worker_routes # noqa: PLC0415

app = web.Application()
app[SDK_WEBSERVER_APP_KEY] = self

app.add_routes(attempt_routes)
app.add_routes(options_routes)
app.add_routes(worker_routes)
app.router.add_static("/static", Path(__file__).parent / "static")

app.on_startup.append(_extract_manifest)
app.middlewares.append(_invalid_question_state_middleware)

jinja2_extensions = ["jinja2.ext.do"]
aiohttp_jinja2.setup(app, loader=PackageLoader(__package__), extensions=jinja2_extensions)

return app

@cached_property
def _package_state_dir(self) -> Path:
manifest = self.web_app[MANIFEST_APP_KEY]
if self._web_app is None:
msg = "Web app not initialized"
raise RuntimeError(msg)

manifest = self._web_app[MANIFEST_APP_KEY]
return self._state_storage_root / f"{manifest.namespace}-{manifest.short_name}-{manifest.version}"


Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# The QuestionPy SDK is free software released under terms of the MIT license. See LICENSE.md.
# (c) Technische Universität Berlin, innoCampus <info@isis.tu-berlin.de>

from collections.abc import Callable
from pathlib import Path
from shutil import copytree

Expand All @@ -20,3 +21,8 @@ def source_path(request: pytest.FixtureRequest, tmp_path: Path) -> Path:
copytree(src_path, dest_path, ignore=lambda src, names: (DIST_DIR,))

return dest_path


@pytest.fixture
def port(unused_tcp_port_factory: Callable) -> int:
return unused_tcp_port_factory()
Loading

0 comments on commit dfe7517

Please sign in to comment.