diff --git a/changelog.rst b/changelog.rst index 1111f7d9..5cb0d924 100644 --- a/changelog.rst +++ b/changelog.rst @@ -11,7 +11,7 @@ Changelog - Drop support for Python 3.8 (`#1055 `__) - [core] Enable ``disallow_untyped_calls`` Mypy rule (`#1055 `__) - [core] Enforced usage of proper keyword-arguments (`#1057 `__) -- [core] Deleted the ``BaseObserverSubclassCallable`` class. Use ``type[BaseObserver]`` directly (`#1055 `__) +- [core] Renamed the ``BaseObserverSubclassCallable`` class to ``ObserverType`` (`#1055 `__) - [core] Improve typing references for events (`#1040 `__) - [inotify] Renamed the ``inotify_event_struct`` class to ``InotifyEventStruct`` (`#1055 `__) - [inotify] Renamed the ``UnsupportedLibc`` exception to ``UnsupportedLibcError`` (`#1057 `__) diff --git a/src/watchdog/observers/__init__.py b/src/watchdog/observers/__init__.py index e67af022..36677fda 100644 --- a/src/watchdog/observers/__init__.py +++ b/src/watchdog/observers/__init__.py @@ -38,7 +38,7 @@ import contextlib import warnings -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Protocol from watchdog.utils import UnsupportedLibcError, platform @@ -46,7 +46,11 @@ from watchdog.observers.api import BaseObserver -def _get_observer_cls() -> type[BaseObserver]: +class ObserverType(Protocol): + def __call__(self, *, timeout: int = ...) -> BaseObserver: ... + + +def _get_observer_cls() -> ObserverType: if platform.is_linux(): with contextlib.suppress(UnsupportedLibcError): from watchdog.observers.inotify import InotifyObserver diff --git a/src/watchdog/observers/api.py b/src/watchdog/observers/api.py index 07befce5..1dc42c6e 100644 --- a/src/watchdog/observers/api.py +++ b/src/watchdog/observers/api.py @@ -36,7 +36,7 @@ class ObservedWatch: Optional collection of :class:`watchdog.events.FileSystemEvent` to watch """ - def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[FileSystemEvent] | None = None): + def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[type[FileSystemEvent]] | None = None): self._path = str(path) if isinstance(path, Path) else path self._is_recursive = recursive self._event_filter = frozenset(event_filter) if event_filter is not None else None @@ -52,12 +52,12 @@ def is_recursive(self) -> bool: return self._is_recursive @property - def event_filter(self) -> frozenset[FileSystemEvent] | None: + def event_filter(self) -> frozenset[type[FileSystemEvent]] | None: """Collection of event types watched for the path""" return self._event_filter @property - def key(self) -> tuple[str, bool, frozenset[FileSystemEvent] | None]: + def key(self) -> tuple[str, bool, frozenset[type[FileSystemEvent]] | None]: return self.path, self.is_recursive, self.event_filter def __eq__(self, watch: object) -> bool: @@ -75,7 +75,7 @@ def __hash__(self) -> int: def __repr__(self) -> str: if self.event_filter is not None: - event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) # type: ignore[attr-defined] + event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) event_filter_str = f", event_filter={event_filter_str}" else: event_filter_str = "" @@ -111,7 +111,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ) -> None: super().__init__() self._event_queue = event_queue @@ -138,7 +138,7 @@ def queue_event(self, event: FileSystemEvent) -> None: An instance of :class:`watchdog.events.FileSystemEvent` or a subclass. """ - if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): # type: ignore[arg-type] + if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): self._event_queue.put((event, self.watch)) def queue_events(self, timeout: int) -> None: @@ -274,7 +274,7 @@ def schedule( path: str, *, recursive: bool = False, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ) -> ObservedWatch: """Schedules watching a path and calls appropriate methods specified in the given event handler in response to file system events. diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 2edf6b10..0812e430 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -68,7 +68,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, suppress_history: bool = False, ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) @@ -329,7 +329,7 @@ def schedule( path: str, *, recursive: bool = False, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ) -> ObservedWatch: # Fix for issue #26: Trace/BPT error when given a unicode path # string. https://github.com/gorakhargosh/watchdog/issues#issue/26 diff --git a/src/watchdog/observers/fsevents2.py b/src/watchdog/observers/fsevents2.py index c21f6b28..cca7edba 100644 --- a/src/watchdog/observers/fsevents2.py +++ b/src/watchdog/observers/fsevents2.py @@ -188,7 +188,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ): super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._fsevents = FSEventsQueue(watch.path) diff --git a/src/watchdog/observers/inotify.py b/src/watchdog/observers/inotify.py index 6627a6c1..36dae47b 100644 --- a/src/watchdog/observers/inotify.py +++ b/src/watchdog/observers/inotify.py @@ -107,7 +107,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._lock = threading.Lock() @@ -207,11 +207,11 @@ def get_event_mask_from_filter(self) -> int | None: event_mask = InotifyConstants.IN_DELETE_SELF for cls in self._event_filter: - if cls in {DirMovedEvent, FileMovedEvent}: # type: ignore[comparison-overlap] + if cls in {DirMovedEvent, FileMovedEvent}: event_mask |= InotifyConstants.IN_MOVE - elif cls in {DirCreatedEvent, FileCreatedEvent}: # type: ignore[comparison-overlap] + elif cls in {DirCreatedEvent, FileCreatedEvent}: event_mask |= InotifyConstants.IN_MOVE | InotifyConstants.IN_CREATE - elif cls is DirModifiedEvent: # type: ignore[comparison-overlap] + elif cls is DirModifiedEvent: event_mask |= ( InotifyConstants.IN_MOVE | InotifyConstants.IN_ATTRIB @@ -219,15 +219,15 @@ def get_event_mask_from_filter(self) -> int | None: | InotifyConstants.IN_CREATE | InotifyConstants.IN_CLOSE_WRITE ) - elif cls is FileModifiedEvent: # type: ignore[comparison-overlap] + elif cls is FileModifiedEvent: event_mask |= InotifyConstants.IN_ATTRIB | InotifyConstants.IN_MODIFY - elif cls in {DirDeletedEvent, FileDeletedEvent}: # type: ignore[comparison-overlap] + elif cls in {DirDeletedEvent, FileDeletedEvent}: event_mask |= InotifyConstants.IN_DELETE - elif cls is FileClosedEvent: # type: ignore[comparison-overlap] + elif cls is FileClosedEvent: event_mask |= InotifyConstants.IN_CLOSE_WRITE - elif cls is FileClosedNoWriteEvent: # type: ignore[comparison-overlap] + elif cls is FileClosedNoWriteEvent: event_mask |= InotifyConstants.IN_CLOSE_NOWRITE - elif cls is FileOpenedEvent: # type: ignore[comparison-overlap] + elif cls is FileOpenedEvent: event_mask |= InotifyConstants.IN_OPEN return event_mask diff --git a/src/watchdog/observers/kqueue.py b/src/watchdog/observers/kqueue.py index 053ae045..cd2e7d6f 100644 --- a/src/watchdog/observers/kqueue.py +++ b/src/watchdog/observers/kqueue.py @@ -403,7 +403,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, stat: Callable = os.stat, ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) diff --git a/src/watchdog/observers/polling.py b/src/watchdog/observers/polling.py index 020e636d..853d849e 100644 --- a/src/watchdog/observers/polling.py +++ b/src/watchdog/observers/polling.py @@ -53,7 +53,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, stat: Callable = os.stat, listdir: Callable = os.scandir, ) -> None: diff --git a/src/watchdog/observers/read_directory_changes.py b/src/watchdog/observers/read_directory_changes.py index 84d87f41..660255a0 100644 --- a/src/watchdog/observers/read_directory_changes.py +++ b/src/watchdog/observers/read_directory_changes.py @@ -39,7 +39,7 @@ def __init__( watch: ObservedWatch, *, timeout: int = DEFAULT_EMITTER_TIMEOUT, - event_filter: list[FileSystemEvent] | None = None, + event_filter: list[type[FileSystemEvent]] | None = None, ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._lock = threading.Lock() diff --git a/src/watchdog/utils/__init__.py b/src/watchdog/utils/__init__.py index 18d493ee..637d9888 100644 --- a/src/watchdog/utils/__init__.py +++ b/src/watchdog/utils/__init__.py @@ -21,6 +21,8 @@ if TYPE_CHECKING: from types import ModuleType + from watchdog.tricks import Trick + class UnsupportedLibcError(Exception): pass @@ -84,7 +86,7 @@ def load_module(module_name: str) -> ModuleType: return sys.modules[module_name] -def load_class(dotted_path: str) -> type: +def load_class(dotted_path: str) -> type[Trick]: """Loads and returns a class definition provided a dotted path specification the last part of the dotted path is the class name and there is at least one module name preceding the class name. diff --git a/src/watchdog/watchmedo.py b/src/watchdog/watchmedo.py index 3f99786b..2456c567 100644 --- a/src/watchdog/watchmedo.py +++ b/src/watchdog/watchmedo.py @@ -25,6 +25,7 @@ from typing import Callable from watchdog.events import FileSystemEventHandler + from watchdog.observers import ObserverType from watchdog.observers.api import BaseObserver @@ -257,7 +258,7 @@ def schedule_tricks(observer: BaseObserver, tricks: dict, pathname: str, *, recu ) def tricks_from(args: Namespace) -> None: """Command to execute tricks from a tricks configuration file.""" - observer_cls: type[BaseObserver] + observer_cls: ObserverType if args.debug_force_polling: from watchdog.observers.polling import PollingObserver @@ -288,7 +289,7 @@ def tricks_from(args: Namespace) -> None: add_to_sys_path(path_split(args.python_path)) observers = [] for tricks_file in args.files: - observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observer = observer_cls(timeout=args.timeout) if not os.path.exists(tricks_file): raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), tricks_file) @@ -361,7 +362,7 @@ def tricks_generate_yaml(args: Namespace) -> None: for trick_path in args.trick_paths: trick_cls = load_class(trick_path) - output.write(trick_cls.generate_yaml()) # type: ignore[attr-defined] + output.write(trick_cls.generate_yaml()) content = output.getvalue() output.close() @@ -466,7 +467,7 @@ def log(args: Namespace) -> None: ignore_directories=args.ignore_directories, ) - observer_cls: type[BaseObserver] + observer_cls: ObserverType if args.debug_force_polling: from watchdog.observers.polling import PollingObserver @@ -494,7 +495,7 @@ def log(args: Namespace) -> None: observer_cls = Observer - observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observer = observer_cls(timeout=args.timeout) observe_with(observer, handler, args.directories, recursive=args.recursive) @@ -590,7 +591,7 @@ def shell_command(args: Namespace) -> None: if not args.command: args.command = None - observer_cls: type[BaseObserver] + observer_cls: ObserverType if args.debug_force_polling: from watchdog.observers.polling import PollingObserver @@ -609,7 +610,7 @@ def shell_command(args: Namespace) -> None: wait_for_process=args.wait_for_process, drop_during_process=args.drop_during_process, ) - observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observer = observer_cls(timeout=args.timeout) observe_with(observer, handler, args.directories, recursive=args.recursive) @@ -707,7 +708,7 @@ def shell_command(args: Namespace) -> None: ) def auto_restart(args: Namespace) -> None: """Command to start a long-running subprocess and restart it on matched events.""" - observer_cls: type[BaseObserver] + observer_cls: ObserverType if args.debug_force_polling: from watchdog.observers.polling import PollingObserver @@ -757,7 +758,7 @@ def handler_termination_signal(_signum: signal._SIGNUM, _frame: object) -> None: restart_on_command_exit=args.restart_on_command_exit, ) handler.start() - observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observer = observer_cls(timeout=args.timeout) try: observe_with(observer, handler, args.directories, recursive=args.recursive) except WatchdogShutdownError: