"""Async wrapper around :class:`SoftReadWriteLock` for use with ``asyncio``."""

from __future__ import annotations

import asyncio
import functools
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING

from ._sync import SoftReadWriteLock

if TYPE_CHECKING:
    import os
    from collections.abc import AsyncGenerator, Callable
    from concurrent import futures
    from types import TracebackType


class AsyncAcquireSoftReadWriteReturnProxy:
    """Async context-aware object that releases an :class:`AsyncSoftReadWriteLock` on exit."""

    def __init__(self, lock: AsyncSoftReadWriteLock) -> None:
        self.lock = lock

    async def __aenter__(self) -> AsyncSoftReadWriteLock:
        return self.lock

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        await self.lock.release()


class AsyncSoftReadWriteLock:
    """
    Async wrapper around :class:`SoftReadWriteLock` for ``asyncio`` applications.

    The sync class's blocking filesystem operations run on a thread pool via ``loop.run_in_executor()``.
    Reentrancy, upgrade/downgrade rules, fork handling, heartbeat and TTL stale detection, and singleton
    behavior are delegated to the underlying :class:`SoftReadWriteLock`.

    :param lock_file: path to the lock file; sidecar state/write/readers live next to it
    :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
    :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
    :param is_singleton: if ``True``, reuse existing :class:`SoftReadWriteLock` instances per resolved path
    :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
    :param stale_threshold: seconds of mtime inactivity before a marker is stale; defaults to ``3 * heartbeat_interval``
    :param poll_interval: seconds between acquire retries under contention; default 0.25 s
    :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
    :param executor: executor for ``run_in_executor``; ``None`` uses the default executor

    .. versionadded:: 3.27.0

    """

    def __init__(  # noqa: PLR0913
        self,
        lock_file: str | os.PathLike[str],
        timeout: float = -1,
        *,
        blocking: bool = True,
        is_singleton: bool = True,
        heartbeat_interval: float = 30.0,
        stale_threshold: float | None = None,
        poll_interval: float = 0.25,
        loop: asyncio.AbstractEventLoop | None = None,
        executor: futures.Executor | None = None,
    ) -> None:
        self._lock = SoftReadWriteLock(
            lock_file,
            timeout,
            blocking=blocking,
            is_singleton=is_singleton,
            heartbeat_interval=heartbeat_interval,
            stale_threshold=stale_threshold,
            poll_interval=poll_interval,
        )
        self._loop = loop
        self._executor = executor

    @property
    def lock_file(self) -> str:
        """:returns: the path to the lock file passed to the constructor."""
        return self._lock.lock_file

    @property
    def timeout(self) -> float:
        """:returns: the default timeout applied when ``acquire_read`` / ``acquire_write`` is called without one."""
        return self._lock.timeout

    @property
    def blocking(self) -> bool:
        """:returns: whether ``acquire_*`` defaults to blocking; ``False`` makes contention raise immediately."""
        return self._lock.blocking

    @property
    def loop(self) -> asyncio.AbstractEventLoop | None:
        """:returns: the event loop used for ``run_in_executor``, or ``None`` for the running loop."""
        return self._loop

    @property
    def executor(self) -> futures.Executor | None:
        """:returns: the executor used for ``run_in_executor``, or ``None`` for the default executor."""
        return self._executor

    async def acquire_read(
        self, timeout: float | None = None, *, blocking: bool | None = None
    ) -> AsyncAcquireSoftReadWriteReturnProxy:
        """
        Acquire a shared read lock.

        See :meth:`SoftReadWriteLock.acquire_read` for the full reentrancy / upgrade / fork semantics. The blocking
        work runs inside ``run_in_executor`` so other coroutines on the same loop continue to progress while this
        call waits.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        :returns: a proxy usable as an async context manager to release the lock

        :raises RuntimeError: if a write lock is already held, if this instance was invalidated by
            :func:`os.fork`, or if :meth:`close` was called
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        await self._run(self._lock.acquire_read, timeout, blocking=blocking)
        return AsyncAcquireSoftReadWriteReturnProxy(lock=self)

    async def acquire_write(
        self, timeout: float | None = None, *, blocking: bool | None = None
    ) -> AsyncAcquireSoftReadWriteReturnProxy:
        """
        Acquire an exclusive write lock.

        See :meth:`SoftReadWriteLock.acquire_write` for the two-phase writer-preferring semantics. The blocking
        work runs inside ``run_in_executor``.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        :returns: a proxy usable as an async context manager to release the lock

        :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if
            this instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        await self._run(self._lock.acquire_write, timeout, blocking=blocking)
        return AsyncAcquireSoftReadWriteReturnProxy(lock=self)

    async def release(self, *, force: bool = False) -> None:
        """
        Release one level of the current lock.

        :param force: if ``True``, release the lock completely regardless of the current lock level

        :raises RuntimeError: if no lock is currently held and *force* is ``False``

        """
        await self._run(self._lock.release, force=force)

    @asynccontextmanager
    async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
        """
        Async context manager that acquires and releases a shared read lock.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        :raises RuntimeError: if a write lock is already held on this instance
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        await self.acquire_read(timeout, blocking=blocking)
        try:
            yield
        finally:
            await self.release()

    @asynccontextmanager
    async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
        """
        Async context manager that acquires and releases an exclusive write lock.

        :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
        :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default

        :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
        :raises Timeout: if the lock cannot be acquired within *timeout* seconds

        """
        await self.acquire_write(timeout, blocking=blocking)
        try:
            yield
        finally:
            await self.release()

    async def close(self) -> None:
        """Release any held lock and release the underlying filesystem resources. Idempotent."""
        await self._run(self._lock.close)

    async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
        loop = self._loop or asyncio.get_running_loop()
        return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))


__all__ = [
    "AsyncAcquireSoftReadWriteReturnProxy",
    "AsyncSoftReadWriteLock",
]
