Skip to content

Commit

Permalink
fix: types
Browse files Browse the repository at this point in the history
  • Loading branch information
BoboTiG committed Aug 22, 2024
1 parent 31b0c34 commit 9af32e0
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 35 deletions.
2 changes: 1 addition & 1 deletion changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Changelog
- Drop support for Python 3.8 (`#1055 <https://github.com/gorakhargosh/watchdog/pull/1055>`__)
- [core] Enable ``disallow_untyped_calls`` Mypy rule (`#1055 <https://github.com/gorakhargosh/watchdog/pull/1055>`__)
- [core] Enforced usage of proper keyword-arguments (`#1057 <https://github.com/gorakhargosh/watchdog/pull/1057>`__)
- [core] Deleted the ``BaseObserverSubclassCallable`` class. Use ``type[BaseObserver]`` directly (`#1055 <https://github.com/gorakhargosh/watchdog/pull/1055>`__)
- [core] Renamed the ``BaseObserverSubclassCallable`` class to ``ObserverType`` (`#1055 <https://github.com/gorakhargosh/watchdog/pull/1055>`__)
- [core] Improve typing references for events (`#1040 <https://github.com/gorakhargosh/watchdog/issues/1040>`__)
- [inotify] Renamed the ``inotify_event_struct`` class to ``InotifyEventStruct`` (`#1055 <https://github.com/gorakhargosh/watchdog/pull/1055>`__)
- [inotify] Renamed the ``UnsupportedLibc`` exception to ``UnsupportedLibcError`` (`#1057 <https://github.com/gorakhargosh/watchdog/pull/1057>`__)
Expand Down
8 changes: 6 additions & 2 deletions src/watchdog/observers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@

import contextlib
import warnings
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Protocol

from watchdog.utils import UnsupportedLibcError, platform

if TYPE_CHECKING:
from watchdog.observers.api import BaseObserver


def _get_observer_cls() -> type[BaseObserver]:
class ObserverType(Protocol):
def __call__(self, *, timeout: int = ...) -> BaseObserver: ...


def _get_observer_cls() -> ObserverType:
if platform.is_linux():
with contextlib.suppress(UnsupportedLibcError):
from watchdog.observers.inotify import InotifyObserver
Expand Down
14 changes: 7 additions & 7 deletions src/watchdog/observers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ObservedWatch:
Optional collection of :class:`watchdog.events.FileSystemEvent` to watch
"""

def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[FileSystemEvent] | None = None):
def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[type[FileSystemEvent]] | None = None):
self._path = str(path) if isinstance(path, Path) else path
self._is_recursive = recursive
self._event_filter = frozenset(event_filter) if event_filter is not None else None
Expand All @@ -52,12 +52,12 @@ def is_recursive(self) -> bool:
return self._is_recursive

@property
def event_filter(self) -> frozenset[FileSystemEvent] | None:
def event_filter(self) -> frozenset[type[FileSystemEvent]] | None:
"""Collection of event types watched for the path"""
return self._event_filter

@property
def key(self) -> tuple[str, bool, frozenset[FileSystemEvent] | None]:
def key(self) -> tuple[str, bool, frozenset[type[FileSystemEvent]] | None]:
return self.path, self.is_recursive, self.event_filter

def __eq__(self, watch: object) -> bool:
Expand All @@ -75,7 +75,7 @@ def __hash__(self) -> int:

def __repr__(self) -> str:
if self.event_filter is not None:
event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) # type: ignore[attr-defined]
event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter))
event_filter_str = f", event_filter={event_filter_str}"
else:
event_filter_str = ""
Expand Down Expand Up @@ -111,7 +111,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__()
self._event_queue = event_queue
Expand All @@ -138,7 +138,7 @@ def queue_event(self, event: FileSystemEvent) -> None:
An instance of :class:`watchdog.events.FileSystemEvent`
or a subclass.
"""
if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): # type: ignore[arg-type]
if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter):
self._event_queue.put((event, self.watch))

def queue_events(self, timeout: int) -> None:
Expand Down Expand Up @@ -274,7 +274,7 @@ def schedule(
path: str,
*,
recursive: bool = False,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> ObservedWatch:
"""Schedules watching a path and calls appropriate methods specified
in the given event handler in response to file system events.
Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
suppress_history: bool = False,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand Down Expand Up @@ -329,7 +329,7 @@ def schedule(
path: str,
*,
recursive: bool = False,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> ObservedWatch:
# Fix for issue #26: Trace/BPT error when given a unicode path
# string. https://github.com/gorakhargosh/watchdog/issues#issue/26
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/fsevents2.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
):
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._fsevents = FSEventsQueue(watch.path)
Expand Down
18 changes: 9 additions & 9 deletions src/watchdog/observers/inotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._lock = threading.Lock()
Expand Down Expand Up @@ -207,27 +207,27 @@ def get_event_mask_from_filter(self) -> int | None:
event_mask = InotifyConstants.IN_DELETE_SELF

for cls in self._event_filter:
if cls in {DirMovedEvent, FileMovedEvent}: # type: ignore[comparison-overlap]
if cls in {DirMovedEvent, FileMovedEvent}:
event_mask |= InotifyConstants.IN_MOVE
elif cls in {DirCreatedEvent, FileCreatedEvent}: # type: ignore[comparison-overlap]
elif cls in {DirCreatedEvent, FileCreatedEvent}:
event_mask |= InotifyConstants.IN_MOVE | InotifyConstants.IN_CREATE
elif cls is DirModifiedEvent: # type: ignore[comparison-overlap]
elif cls is DirModifiedEvent:
event_mask |= (
InotifyConstants.IN_MOVE
| InotifyConstants.IN_ATTRIB
| InotifyConstants.IN_MODIFY
| InotifyConstants.IN_CREATE
| InotifyConstants.IN_CLOSE_WRITE
)
elif cls is FileModifiedEvent: # type: ignore[comparison-overlap]
elif cls is FileModifiedEvent:
event_mask |= InotifyConstants.IN_ATTRIB | InotifyConstants.IN_MODIFY
elif cls in {DirDeletedEvent, FileDeletedEvent}: # type: ignore[comparison-overlap]
elif cls in {DirDeletedEvent, FileDeletedEvent}:
event_mask |= InotifyConstants.IN_DELETE
elif cls is FileClosedEvent: # type: ignore[comparison-overlap]
elif cls is FileClosedEvent:
event_mask |= InotifyConstants.IN_CLOSE_WRITE
elif cls is FileClosedNoWriteEvent: # type: ignore[comparison-overlap]
elif cls is FileClosedNoWriteEvent:
event_mask |= InotifyConstants.IN_CLOSE_NOWRITE
elif cls is FileOpenedEvent: # type: ignore[comparison-overlap]
elif cls is FileOpenedEvent:
event_mask |= InotifyConstants.IN_OPEN

return event_mask
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
listdir: Callable = os.scandir,
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/read_directory_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[FileSystemEvent] | None = None,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._lock = threading.Lock()
Expand Down
4 changes: 3 additions & 1 deletion src/watchdog/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
if TYPE_CHECKING:
from types import ModuleType

from watchdog.tricks import Trick


class UnsupportedLibcError(Exception):
pass
Expand Down Expand Up @@ -84,7 +86,7 @@ def load_module(module_name: str) -> ModuleType:
return sys.modules[module_name]


def load_class(dotted_path: str) -> type:
def load_class(dotted_path: str) -> type[Trick]:
"""Loads and returns a class definition provided a dotted path
specification the last part of the dotted path is the class name
and there is at least one module name preceding the class name.
Expand Down
19 changes: 10 additions & 9 deletions src/watchdog/watchmedo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import Callable

from watchdog.events import FileSystemEventHandler
from watchdog.observers import ObserverType
from watchdog.observers.api import BaseObserver


Expand Down Expand Up @@ -257,7 +258,7 @@ def schedule_tricks(observer: BaseObserver, tricks: dict, pathname: str, *, recu
)
def tricks_from(args: Namespace) -> None:
"""Command to execute tricks from a tricks configuration file."""
observer_cls: type[BaseObserver]
observer_cls: ObserverType
if args.debug_force_polling:
from watchdog.observers.polling import PollingObserver

Expand Down Expand Up @@ -288,7 +289,7 @@ def tricks_from(args: Namespace) -> None:
add_to_sys_path(path_split(args.python_path))
observers = []
for tricks_file in args.files:
observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg]
observer = observer_cls(timeout=args.timeout)

if not os.path.exists(tricks_file):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), tricks_file)
Expand Down Expand Up @@ -361,7 +362,7 @@ def tricks_generate_yaml(args: Namespace) -> None:

for trick_path in args.trick_paths:
trick_cls = load_class(trick_path)
output.write(trick_cls.generate_yaml()) # type: ignore[attr-defined]
output.write(trick_cls.generate_yaml())

content = output.getvalue()
output.close()
Expand Down Expand Up @@ -466,7 +467,7 @@ def log(args: Namespace) -> None:
ignore_directories=args.ignore_directories,
)

observer_cls: type[BaseObserver]
observer_cls: ObserverType
if args.debug_force_polling:
from watchdog.observers.polling import PollingObserver

Expand Down Expand Up @@ -494,7 +495,7 @@ def log(args: Namespace) -> None:

observer_cls = Observer

observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg]
observer = observer_cls(timeout=args.timeout)
observe_with(observer, handler, args.directories, recursive=args.recursive)


Expand Down Expand Up @@ -590,7 +591,7 @@ def shell_command(args: Namespace) -> None:
if not args.command:
args.command = None

observer_cls: type[BaseObserver]
observer_cls: ObserverType
if args.debug_force_polling:
from watchdog.observers.polling import PollingObserver

Expand All @@ -609,7 +610,7 @@ def shell_command(args: Namespace) -> None:
wait_for_process=args.wait_for_process,
drop_during_process=args.drop_during_process,
)
observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg]
observer = observer_cls(timeout=args.timeout)
observe_with(observer, handler, args.directories, recursive=args.recursive)


Expand Down Expand Up @@ -707,7 +708,7 @@ def shell_command(args: Namespace) -> None:
)
def auto_restart(args: Namespace) -> None:
"""Command to start a long-running subprocess and restart it on matched events."""
observer_cls: type[BaseObserver]
observer_cls: ObserverType
if args.debug_force_polling:
from watchdog.observers.polling import PollingObserver

Expand Down Expand Up @@ -757,7 +758,7 @@ def handler_termination_signal(_signum: signal._SIGNUM, _frame: object) -> None:
restart_on_command_exit=args.restart_on_command_exit,
)
handler.start()
observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg]
observer = observer_cls(timeout=args.timeout)
try:
observe_with(observer, handler, args.directories, recursive=args.recursive)
except WatchdogShutdownError:
Expand Down

0 comments on commit 9af32e0

Please sign in to comment.