diff --git a/pyproject.toml b/pyproject.toml index 0a074bb..1de0f4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ optional-dependencies.testing = [ "coverage>=7.3.2", "diff-cover>=8.0.1", "pytest>=7.4.3", + "pytest-asyncio>=0.21", "pytest-cov>=4.1", "pytest-mock>=3.12", "pytest-timeout>=2.2", diff --git a/src/filelock/__init__.py b/src/filelock/__init__.py index 006299d..c9d8c5b 100644 --- a/src/filelock/__init__.py +++ b/src/filelock/__init__.py @@ -17,6 +17,13 @@ from ._soft import SoftFileLock from ._unix import UnixFileLock, has_fcntl from ._windows import WindowsFileLock +from .asyncio import ( + AsyncAcquireReturnProxy, + AsyncSoftFileLock, + AsyncUnixFileLock, + AsyncWindowsFileLock, + BaseAsyncFileLock, +) from .version import version #: version of the project as a string @@ -25,23 +32,34 @@ if sys.platform == "win32": # pragma: win32 cover _FileLock: type[BaseFileLock] = WindowsFileLock + _AsyncFileLock: type[BaseAsyncFileLock] = AsyncWindowsFileLock else: # pragma: win32 no cover # noqa: PLR5501 if has_fcntl: _FileLock: type[BaseFileLock] = UnixFileLock + _AsyncFileLock: type[BaseAsyncFileLock] = AsyncUnixFileLock else: _FileLock = SoftFileLock + _AsyncFileLock = AsyncSoftFileLock if warnings is not None: warnings.warn("only soft file lock is available", stacklevel=2) if TYPE_CHECKING: FileLock = SoftFileLock + AsyncFileLock = AsyncSoftFileLock else: #: Alias for the lock, which should be used for the current platform. FileLock = _FileLock + AsyncFileLock = _AsyncFileLock __all__ = [ "AcquireReturnProxy", + "AsyncAcquireReturnProxy", + "AsyncFileLock", + "AsyncSoftFileLock", + "AsyncUnixFileLock", + "AsyncWindowsFileLock", + "BaseAsyncFileLock", "BaseFileLock", "FileLock", "SoftFileLock", diff --git a/src/filelock/_api.py b/src/filelock/_api.py index fd87972..de34317 100644 --- a/src/filelock/_api.py +++ b/src/filelock/_api.py @@ -91,7 +91,7 @@ def __new__( # noqa: PLR0913 *, blocking: bool = True, # noqa: ARG003 is_singleton: bool = False, - **kwargs: dict[str, Any], # capture remaining kwargs for subclasses # noqa: ARG003 + **kwargs: Any, # capture remaining kwargs for subclasses # noqa: ARG003, ANN401 ) -> Self: """Create a new lock object or if specified return the singleton instance for the lock file.""" if not is_singleton: diff --git a/src/filelock/asyncio.py b/src/filelock/asyncio.py new file mode 100644 index 0000000..feedd27 --- /dev/null +++ b/src/filelock/asyncio.py @@ -0,0 +1,314 @@ +"""An asyncio-based implementation of the file lock.""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import os +import time +from dataclasses import dataclass +from threading import local +from typing import TYPE_CHECKING, Any, Callable, NoReturn + +from ._api import BaseFileLock, FileLockContext +from ._error import Timeout +from ._soft import SoftFileLock +from ._unix import UnixFileLock +from ._windows import WindowsFileLock + +if TYPE_CHECKING: + import sys + from concurrent import futures + from types import TracebackType + + if sys.version_info >= (3, 11): # pragma: no cover (py311+) + from typing import Self + else: # pragma: no cover ( None: # noqa: D107 + self.lock = lock + + async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105 + return self.lock + + async def __aexit__( # noqa: D105 + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + await self.lock.release() + + +class BaseAsyncFileLock(BaseFileLock): + """Base class for asynchronous file locks.""" + + def __init__( # noqa: PLR0913 + self, + lock_file: str | os.PathLike[str], + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = False, # noqa: FBT001, FBT002 + *, + blocking: bool = True, + is_singleton: bool = False, + loop: asyncio.AbstractEventLoop | None = None, + run_in_executor: bool = True, + executor: futures.Executor | None = None, + ) -> None: + """ + Create a new lock object. + + :param lock_file: path to the file + :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \ + the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \ + to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock. + :param mode: file permissions for the lockfile + :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \ + ``False`` then the lock will be reentrant across threads. + :param blocking: whether the lock should be blocking or not + :param is_singleton: If this is set to ``True`` then only one instance of this class will be created \ + per lock file. This is useful if you want to use the lock object for reentrant locking without needing \ + to pass the same object around. + :param loop: The event loop to use. If not specified, the running event loop will be used. + :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor. + :param executor: The executor to use. If not specified, the default executor will be used. + + """ + self._is_thread_local = thread_local + self._is_singleton = is_singleton + if thread_local and run_in_executor: + msg = "run_in_executor is not supported when thread_local is True" + raise ValueError(msg) + + # Create the context. Note that external code should not work with the context directly and should instead use + # properties of this class. + kwargs: dict[str, Any] = { + "lock_file": os.fspath(lock_file), + "timeout": timeout, + "mode": mode, + "blocking": blocking, + "loop": loop, + "run_in_executor": run_in_executor, + "executor": executor, + } + self._context: AsyncFileLockContext = (AsyncThreadLocalFileContext if thread_local else AsyncFileLockContext)( + **kwargs + ) + + @property + def run_in_executor(self) -> bool: + """::return: whether run in executor.""" + return self._context.run_in_executor + + @property + def executor(self) -> futures.Executor | None: + """::return: the executor.""" + return self._context.executor + + @executor.setter + def executor(self, value: futures.Executor | None) -> None: # pragma: no cover + """ + Change the executor. + + :param value: the new executor or ``None`` + :type value: futures.Executor | None + + """ + self._context.executor = value + + @property + def loop(self) -> asyncio.AbstractEventLoop | None: + """::return: the event loop.""" + return self._context.loop + + async def acquire( # type: ignore[override] + self, + timeout: float | None = None, + poll_interval: float = 0.05, + *, + blocking: bool | None = None, + ) -> AsyncAcquireReturnProxy: + """ + Try to acquire the file lock. + + :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default + :attr:`~BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and + this method will block until the lock could be acquired + :param poll_interval: interval of trying to acquire the lock file + :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the + first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired. + :raises Timeout: if fails to acquire lock within the timeout period + :return: a context object that will unlock the file when the context is exited + + .. code-block:: python + + # You can use this method in the context manager (recommended) + with lock.acquire(): + pass + + # Or use an equivalent try-finally construct: + lock.acquire() + try: + pass + finally: + lock.release() + + """ + # Use the default timeout, if no timeout is provided. + if timeout is None: + timeout = self._context.timeout + + if blocking is None: + blocking = self._context.blocking + + # Increment the number right at the beginning. We can still undo it, if something fails. + self._context.lock_counter += 1 + + lock_id = id(self) + lock_filename = self.lock_file + start_time = time.perf_counter() + try: + while True: + if not self.is_locked: + _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename) + await self._run_internal_method(self._acquire) + if self.is_locked: + _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename) + break + if blocking is False: + _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename) + raise Timeout(lock_filename) # noqa: TRY301 + if 0 <= timeout < time.perf_counter() - start_time: + _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename) + raise Timeout(lock_filename) # noqa: TRY301 + msg = "Lock %s not acquired on %s, waiting %s seconds ..." + _LOGGER.debug(msg, lock_id, lock_filename, poll_interval) + await asyncio.sleep(poll_interval) + except BaseException: # Something did go wrong, so decrement the counter. + self._context.lock_counter = max(0, self._context.lock_counter - 1) + raise + return AsyncAcquireReturnProxy(lock=self) + + async def release(self, force: bool = False) -> None: # type: ignore[override] # noqa: FBT001, FBT002 + """ + Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. + Also note, that the lock file itself is not automatically deleted. + + :param force: If true, the lock counter is ignored and the lock is released in every case/ + + """ + if self.is_locked: + self._context.lock_counter -= 1 + + if self._context.lock_counter == 0 or force: + lock_id, lock_filename = id(self), self.lock_file + + _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename) + await self._run_internal_method(self._release) + self._context.lock_counter = 0 + _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename) + + async def _run_internal_method(self, method: Callable[[], Any]) -> None: + if asyncio.iscoroutinefunction(method): + await method() + elif self.run_in_executor: + loop = self.loop or asyncio.get_running_loop() + await loop.run_in_executor(self.executor, method) + else: + method() + + def __enter__(self) -> NoReturn: + """ + Replace old __enter__ method to avoid using it. + + NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD. + + :return: none + :rtype: NoReturn + """ + msg = "Do not use `with` for asyncio locks, use `async with` instead." + raise NotImplementedError(msg) + + async def __aenter__(self) -> Self: + """ + Acquire the lock. + + :return: the lock object + + """ + await self.acquire() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + """ + Release the lock. + + :param exc_type: the exception type if raised + :param exc_value: the exception value if raised + :param traceback: the exception traceback if raised + + """ + await self.release() + + def __del__(self) -> None: + """Called when the lock object is deleted.""" + with contextlib.suppress(RuntimeError): + loop = self.loop or asyncio.get_running_loop() + if not loop.is_running(): # pragma: no cover + loop.run_until_complete(self.release(force=True)) + else: + loop.create_task(self.release(force=True)) + + +class AsyncSoftFileLock(SoftFileLock, BaseAsyncFileLock): + """Simply watches the existence of the lock file.""" + + +class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock): + """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" + + +class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock): + """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems.""" + + +__all__ = [ + "AsyncAcquireReturnProxy", + "AsyncSoftFileLock", + "AsyncUnixFileLock", + "AsyncWindowsFileLock", + "BaseAsyncFileLock", +] diff --git a/tests/test_async_filelock.py b/tests/test_async_filelock.py new file mode 100644 index 0000000..e0b150c --- /dev/null +++ b/tests/test_async_filelock.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import logging +from pathlib import Path, PurePath + +import pytest + +from filelock import AsyncFileLock, AsyncSoftFileLock, BaseAsyncFileLock, Timeout + + +@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock]) +@pytest.mark.parametrize("path_type", [str, PurePath, Path]) +@pytest.mark.parametrize("filename", ["a", "new/b", "new2/new3/c"]) +@pytest.mark.asyncio() +async def test_simple( + lock_type: type[BaseAsyncFileLock], + path_type: type[str | Path], + filename: str, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + caplog.set_level(logging.DEBUG) + + # test lock creation by passing a `str` + lock_path = tmp_path / filename + lock = lock_type(path_type(lock_path)) + async with lock as locked: + assert lock.is_locked + assert lock is locked + assert not lock.is_locked + + assert caplog.messages == [ + f"Attempting to acquire lock {id(lock)} on {lock_path}", + f"Lock {id(lock)} acquired on {lock_path}", + f"Attempting to release lock {id(lock)} on {lock_path}", + f"Lock {id(lock)} released on {lock_path}", + ] + assert [r.levelno for r in caplog.records] == [logging.DEBUG, logging.DEBUG, logging.DEBUG, logging.DEBUG] + assert [r.name for r in caplog.records] == ["filelock", "filelock", "filelock", "filelock"] + assert logging.getLogger("filelock").level == logging.NOTSET + + +@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock]) +@pytest.mark.parametrize("path_type", [str, PurePath, Path]) +@pytest.mark.parametrize("filename", ["a", "new/b", "new2/new3/c"]) +@pytest.mark.asyncio() +async def test_acquire( + lock_type: type[BaseAsyncFileLock], + path_type: type[str | Path], + filename: str, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + caplog.set_level(logging.DEBUG) + + # test lock creation by passing a `str` + lock_path = tmp_path / filename + lock = lock_type(path_type(lock_path)) + async with await lock.acquire() as locked: + assert lock.is_locked + assert lock is locked + assert not lock.is_locked + + assert caplog.messages == [ + f"Attempting to acquire lock {id(lock)} on {lock_path}", + f"Lock {id(lock)} acquired on {lock_path}", + f"Attempting to release lock {id(lock)} on {lock_path}", + f"Lock {id(lock)} released on {lock_path}", + ] + assert [r.levelno for r in caplog.records] == [logging.DEBUG, logging.DEBUG, logging.DEBUG, logging.DEBUG] + assert [r.name for r in caplog.records] == ["filelock", "filelock", "filelock", "filelock"] + assert logging.getLogger("filelock").level == logging.NOTSET + + +@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock]) +@pytest.mark.asyncio() +async def test_non_blocking(lock_type: type[BaseAsyncFileLock], tmp_path: Path) -> None: + # raises Timeout error when the lock cannot be acquired + lock_path = tmp_path / "a" + lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path)) + lock_3 = lock_type(str(lock_path), blocking=False) + lock_4 = lock_type(str(lock_path), timeout=0) + lock_5 = lock_type(str(lock_path), blocking=False, timeout=-1) + + # acquire lock 1 + await lock_1.acquire() + assert lock_1.is_locked + assert not lock_2.is_locked + assert not lock_3.is_locked + assert not lock_4.is_locked + assert not lock_5.is_locked + + # try to acquire lock 2 + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + await lock_2.acquire(blocking=False) + assert not lock_2.is_locked + assert lock_1.is_locked + + # try to acquire pre-parametrized `blocking=False` lock 3 with `acquire` + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + await lock_3.acquire() + assert not lock_3.is_locked + assert lock_1.is_locked + + # try to acquire pre-parametrized `blocking=False` lock 3 with context manager + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + async with lock_3: + pass + assert not lock_3.is_locked + assert lock_1.is_locked + + # try to acquire pre-parametrized `timeout=0` lock 4 with `acquire` + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + await lock_4.acquire() + assert not lock_4.is_locked + assert lock_1.is_locked + + # try to acquire pre-parametrized `timeout=0` lock 4 with context manager + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + async with lock_4: + pass + assert not lock_4.is_locked + assert lock_1.is_locked + + # blocking precedence over timeout + # try to acquire pre-parametrized `timeout=-1,blocking=False` lock 5 with `acquire` + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + await lock_5.acquire() + assert not lock_5.is_locked + assert lock_1.is_locked + + # try to acquire pre-parametrized `timeout=-1,blocking=False` lock 5 with context manager + with pytest.raises(Timeout, match="The file lock '.*' could not be acquired."): + async with lock_5: + pass + assert not lock_5.is_locked + assert lock_1.is_locked + + # release lock 1 + await lock_1.release() + assert not lock_1.is_locked + assert not lock_2.is_locked + assert not lock_3.is_locked + assert not lock_4.is_locked + assert not lock_5.is_locked + + +@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock]) +@pytest.mark.parametrize("thread_local", [True, False]) +@pytest.mark.asyncio() +async def test_non_executor(lock_type: type[BaseAsyncFileLock], thread_local: bool, tmp_path: Path) -> None: + lock_path = tmp_path / "a" + lock = lock_type(str(lock_path), thread_local=thread_local, run_in_executor=False) + async with lock as locked: + assert lock.is_locked + assert lock is locked + assert not lock.is_locked + + +@pytest.mark.asyncio() +async def test_coroutine_function(tmp_path: Path) -> None: + acquired = released = False + + class AioFileLock(BaseAsyncFileLock): + async def _acquire(self) -> None: # type: ignore[override] + nonlocal acquired + acquired = True + self._context.lock_file_fd = 1 + + async def _release(self) -> None: # type: ignore[override] + nonlocal released + released = True + self._context.lock_file_fd = None + + lock = AioFileLock(str(tmp_path / "a")) + await lock.acquire() + assert acquired + assert not released + await lock.release() + assert acquired + assert released