import os
import time
from enum import Flag, IntEnum
from pathlib import Path

import cython
import cython.cimports.libav as lib
from cython.cimports.av.container.core import timeout_info
from cython.cimports.av.container.input import InputContainer
from cython.cimports.av.container.output import OutputContainer
from cython.cimports.av.container.pyio import pyio_close_custom_gil, pyio_close_gil
from cython.cimports.av.error import err_check, stash_exception
from cython.cimports.av.format import build_container_format
from cython.cimports.av.utils import (
    avdict_to_dict,
    avrational_to_fraction,
    dict_to_avdict,
    to_avrational,
)
from cython.cimports.libc.stdint import int64_t
from cython.operator import dereference

from av.logging import Capture as LogCapture

_cinit_sentinel = cython.declare(object, object())

AVChapterPtrPtr = cython.typedef(cython.pointer[cython.pointer[lib.AVChapter]])


@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def interrupt_cb(p: cython.p_void) -> cython.int:
    info: timeout_info = dereference(cython.cast(cython.pointer[timeout_info], p))
    if info.timeout < 0:  # timeout < 0 means no timeout
        return 0

    current_time: cython.double
    with cython.gil:
        current_time = time.monotonic()
        if current_time < info.start_time:
            # Raise this when we get back to Python.
            stash_exception(
                (
                    RuntimeError,
                    RuntimeError("Clock has been changed to before timeout start"),
                    None,
                )
            )
            return 1

    if current_time > info.start_time + info.timeout:
        return 1
    return 0


@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def pyav_io_open(
    s: cython.pointer[lib.AVFormatContext],
    pb: cython.pointer[cython.pointer[lib.AVIOContext]],
    url: cython.p_const_char,
    flags: cython.int,
    options: cython.pointer[cython.pointer[lib.AVDictionary]],
) -> cython.int:
    with cython.gil:
        return pyav_io_open_gil(s, pb, url, flags, options)


@cython.cfunc
@cython.exceptval(check=False)
def pyav_io_open_gil(
    s: cython.pointer[lib.AVFormatContext],
    pb: cython.pointer[cython.pointer[lib.AVIOContext]],
    url: cython.p_const_char,
    flags: cython.int,
    options: cython.pointer[cython.pointer[lib.AVDictionary]],
) -> cython.int:
    container: Container
    file: object
    pyio_file: PyIOFile
    try:
        container = cython.cast(Container, dereference(s).opaque)

        if options is not cython.NULL:
            options_dict = avdict_to_dict(
                dereference(
                    cython.cast(
                        cython.pointer[cython.pointer[lib.AVDictionary]], options
                    )
                ),
                encoding=container.metadata_encoding,
                errors=container.metadata_errors,
            )
        else:
            options_dict = {}

        file = container.io_open(
            cython.cast(str, url) if url is not cython.NULL else "", flags, options_dict
        )

        pyio_file = PyIOFile(
            file, container.buffer_size, (flags & lib.AVIO_FLAG_WRITE) != 0
        )

        # Add it to the container to avoid it being deallocated
        container.open_files[cython.cast(int64_t, pyio_file.iocontext.opaque)] = (
            pyio_file
        )
        pb[0] = pyio_file.iocontext
        return 0
    except Exception:
        return stash_exception()


@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def pyav_io_close(
    s: cython.pointer[lib.AVFormatContext], pb: cython.pointer[lib.AVIOContext]
) -> cython.int:
    with cython.gil:
        return pyav_io_close_gil(s, pb)


@cython.cfunc
@cython.exceptval(check=False)
def pyav_io_close_gil(
    s: cython.pointer[lib.AVFormatContext], pb: cython.pointer[lib.AVIOContext]
) -> cython.int:
    container: Container
    result: cython.int = 0
    try:
        container = cython.cast(Container, dereference(s).opaque)
        if (
            container.open_files is not None
            and cython.cast(int64_t, pb.opaque) in container.open_files
        ):
            result = pyio_close_custom_gil(pb)

            # Remove it from the container so that it can be deallocated
            del container.open_files[cython.cast(int64_t, pb.opaque)]
        else:
            result = pyio_close_gil(pb)

    except Exception:
        stash_exception()
        result = lib.AVERROR_UNKNOWN  # Or another appropriate error code

    return result


@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
def _free_chapters(ctx: cython.pointer[lib.AVFormatContext]) -> cython.void:
    i: cython.Py_ssize_t
    if ctx.chapters != cython.NULL:
        for i in range(ctx.nb_chapters):
            if ctx.chapters[i] != cython.NULL:
                if ctx.chapters[i].metadata != cython.NULL:
                    lib.av_dict_free(cython.address(ctx.chapters[i].metadata))
                lib.av_freep(
                    cython.cast(cython.pp_void, cython.address(ctx.chapters[i]))
                )
        lib.av_freep(cython.cast(cython.pp_void, cython.address(ctx.chapters)))
    ctx.nb_chapters = 0


# fmt: off
class Flags(Flag):
    gen_pts: "Generate missing pts even if it requires parsing future frames." = lib.AVFMT_FLAG_GENPTS
    ign_idx: "Ignore index." = lib.AVFMT_FLAG_IGNIDX
    non_block: "Do not block when reading packets from input." = lib.AVFMT_FLAG_NONBLOCK
    ign_dts: "Ignore DTS on frames that contain both DTS & PTS." = lib.AVFMT_FLAG_IGNDTS
    no_fillin: "Do not infer any values from other values, just return what is stored in the container." = lib.AVFMT_FLAG_NOFILLIN
    no_parse: "Do not use AVParsers, you also must set AVFMT_FLAG_NOFILLIN as the fill in code works on frames and no parsing -> no frames. Also seeking to frames can not work if parsing to find frame boundaries has been disabled." = lib.AVFMT_FLAG_NOPARSE
    no_buffer: "Do not buffer frames when possible." = lib.AVFMT_FLAG_NOBUFFER
    custom_io: "The caller has supplied a custom AVIOContext, don't avio_close() it." = lib.AVFMT_FLAG_CUSTOM_IO
    discard_corrupt: "Discard frames marked corrupted." = lib.AVFMT_FLAG_DISCARD_CORRUPT
    flush_packets: "Flush the AVIOContext every packet." = lib.AVFMT_FLAG_FLUSH_PACKETS
    bitexact: "When muxing, try to avoid writing any random/volatile data to the output. This includes any random IDs, real-time timestamps/dates, muxer version, etc. This flag is mainly intended for testing." = lib.AVFMT_FLAG_BITEXACT
    sort_dts: "Try to interleave outputted packets by dts (using this flag can slow demuxing down)." = lib.AVFMT_FLAG_SORT_DTS
    fast_seek: "Enable fast, but inaccurate seeks for some formats." = lib.AVFMT_FLAG_FAST_SEEK
    auto_bsf: "Add bitstream filters as requested by the muxer." = lib.AVFMT_FLAG_AUTO_BSF

class AudioCodec(IntEnum):
    """Enumeration for audio codec IDs."""
    none = lib.AV_CODEC_ID_NONE  # No codec.
    pcm_alaw = lib.AV_CODEC_ID_PCM_ALAW  # PCM A-law.
    pcm_bluray = lib.AV_CODEC_ID_PCM_BLURAY  # PCM Blu-ray.
    pcm_dvd = lib.AV_CODEC_ID_PCM_DVD  # PCM DVD.
    pcm_f16le = lib.AV_CODEC_ID_PCM_F16LE  # PCM F16 little-endian.
    pcm_f24le = lib.AV_CODEC_ID_PCM_F24LE  # PCM F24 little-endian.
    pcm_f32be = lib.AV_CODEC_ID_PCM_F32BE  # PCM F32 big-endian.
    pcm_f32le = lib.AV_CODEC_ID_PCM_F32LE  # PCM F32 little-endian.
    pcm_f64be = lib.AV_CODEC_ID_PCM_F64BE  # PCM F64 big-endian.
    pcm_f64le = lib.AV_CODEC_ID_PCM_F64LE  # PCM F64 little-endian.
    pcm_lxf = lib.AV_CODEC_ID_PCM_LXF  # PCM LXF.
    pcm_mulaw = lib.AV_CODEC_ID_PCM_MULAW  # PCM μ-law.
    pcm_s16be = lib.AV_CODEC_ID_PCM_S16BE  # PCM signed 16-bit big-endian.
    pcm_s16be_planar = lib.AV_CODEC_ID_PCM_S16BE_PLANAR  # PCM signed 16-bit big-endian planar.
    pcm_s16le = lib.AV_CODEC_ID_PCM_S16LE  # PCM signed 16-bit little-endian.
    pcm_s16le_planar = lib.AV_CODEC_ID_PCM_S16LE_PLANAR  # PCM signed 16-bit little-endian planar.
    pcm_s24be = lib.AV_CODEC_ID_PCM_S24BE  # PCM signed 24-bit big-endian.
    pcm_s24daud = lib.AV_CODEC_ID_PCM_S24DAUD  # PCM signed 24-bit D-Cinema audio.
    pcm_s24le = lib.AV_CODEC_ID_PCM_S24LE  # PCM signed 24-bit little-endian.
    pcm_s24le_planar = lib.AV_CODEC_ID_PCM_S24LE_PLANAR  # PCM signed 24-bit little-endian planar.
    pcm_s32be = lib.AV_CODEC_ID_PCM_S32BE  # PCM signed 32-bit big-endian.
    pcm_s32le = lib.AV_CODEC_ID_PCM_S32LE  # PCM signed 32-bit little-endian.
    pcm_s32le_planar = lib.AV_CODEC_ID_PCM_S32LE_PLANAR  # PCM signed 32-bit little-endian planar.
    pcm_s64be = lib.AV_CODEC_ID_PCM_S64BE  # PCM signed 64-bit big-endian.
    pcm_s64le = lib.AV_CODEC_ID_PCM_S64LE  # PCM signed 64-bit little-endian.
    pcm_s8 = lib.AV_CODEC_ID_PCM_S8  # PCM signed 8-bit.
    pcm_s8_planar = lib.AV_CODEC_ID_PCM_S8_PLANAR  # PCM signed 8-bit planar.
    pcm_u16be = lib.AV_CODEC_ID_PCM_U16BE  # PCM unsigned 16-bit big-endian.
    pcm_u16le = lib.AV_CODEC_ID_PCM_U16LE  # PCM unsigned 16-bit little-endian.
    pcm_u24be = lib.AV_CODEC_ID_PCM_U24BE  # PCM unsigned 24-bit big-endian.
    pcm_u24le = lib.AV_CODEC_ID_PCM_U24LE  # PCM unsigned 24-bit little-endian.
    pcm_u32be = lib.AV_CODEC_ID_PCM_U32BE  # PCM unsigned 32-bit big-endian.
    pcm_u32le = lib.AV_CODEC_ID_PCM_U32LE  # PCM unsigned 32-bit little-endian.
    pcm_u8 = lib.AV_CODEC_ID_PCM_U8  # PCM unsigned 8-bit.
    pcm_vidc = lib.AV_CODEC_ID_PCM_VIDC  # PCM VIDC.
# fmt: on


@cython.cclass
class Container:
    def __cinit__(
        self,
        sentinel,
        file_,
        format_name,
        options,
        container_options,
        stream_options,
        hwaccel,
        metadata_encoding,
        metadata_errors,
        buffer_size,
        open_timeout,
        read_timeout,
        io_open,
    ):
        if sentinel is not _cinit_sentinel:
            raise RuntimeError("cannot construct base Container")

        writeable: cython.bint = isinstance(self, OutputContainer)
        if not writeable and not isinstance(self, InputContainer):
            raise RuntimeError("Container cannot be directly extended.")

        if isinstance(file_, str):
            self.name = file_
        else:
            self.name = str(getattr(file_, "name", "<none>"))

        self.options = dict(options or ())
        self.container_options = dict(container_options or ())
        self.stream_options = [dict(x) for x in stream_options or ()]

        self.hwaccel = hwaccel

        self.metadata_encoding = metadata_encoding
        self.metadata_errors = metadata_errors

        self.open_timeout = open_timeout
        self.read_timeout = read_timeout

        self.buffer_size = buffer_size
        self.io_open = io_open

        acodec = None  # no audio codec specified
        if format_name is not None:
            if ":" in format_name:
                format_name, acodec = format_name.split(":")
            self.format = ContainerFormat(format_name)

        res: cython.int
        name_obj: bytes = os.fsencode(self.name)
        name: cython.p_char = name_obj
        ofmt: cython.pointer[cython.const[lib.AVOutputFormat]]

        if writeable:
            self._myflag |= 1  # enum.writeable = True
            ofmt = (
                self.format.optr
                if self.format
                else lib.av_guess_format(cython.NULL, name, cython.NULL)
            )
            if ofmt == cython.NULL:
                raise ValueError("Could not determine output format")

            with cython.nogil:
                # This does not actually open the file.
                res = lib.avformat_alloc_output_context2(
                    cython.address(self.ptr),
                    ofmt,
                    cython.NULL,
                    name,
                )
            self.err_check(res)
        else:
            # We need the context before we open the input AND setup Python IO.
            self.ptr = lib.avformat_alloc_context()

            # Setup interrupt callback
            if self.open_timeout is not None or self.read_timeout is not None:
                self.ptr.interrupt_callback.callback = interrupt_cb
                self.ptr.interrupt_callback.opaque = cython.address(
                    self.interrupt_callback_info
                )

            if acodec is not None:
                self.ptr.audio_codec_id = getattr(AudioCodec, acodec)

        self.ptr.flags |= lib.AVFMT_FLAG_GENPTS
        self.ptr.opaque = cython.cast(cython.p_void, self)

        # Setup Python IO.
        self.open_files = {}
        if not isinstance(file_, basestring):
            self.file = PyIOFile(file_, buffer_size, writeable)
            self.ptr.pb = self.file.iocontext

        if io_open is not None:
            self.ptr.io_open = pyav_io_open
            self.ptr.io_close2 = pyav_io_close
            self.ptr.flags |= lib.AVFMT_FLAG_CUSTOM_IO

        ifmt: cython.pointer[cython.const[lib.AVInputFormat]]
        c_options: Dictionary
        if not writeable:
            ifmt = self.format.iptr if self.format else cython.NULL
            c_options = Dictionary(self.options, self.container_options)

            self.set_timeout(self.open_timeout)
            self.start_timeout()
            with cython.nogil:
                res = lib.avformat_open_input(
                    cython.address(self.ptr), name, ifmt, cython.address(c_options.ptr)
                )
            self.set_timeout(None)
            self.err_check(res)
            self._myflag |= 2  # enum.input_was_opened = True

        if format_name is None:
            self.format = build_container_format(self.ptr.iformat, self.ptr.oformat)

    def __dealloc__(self):
        with cython.nogil:
            lib.avformat_free_context(self.ptr)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        raise NotImplementedError

    def __repr__(self):
        return f"<av.{self.__class__.__name__} {self.file or self.name!r}>"

    @cython.cfunc
    @cython.exceptval(-1, check=False)
    def err_check(self, value: cython.int) -> cython.int:
        return err_check(value, filename=self.name)

    def dumps_format(self):
        self._assert_open()
        with LogCapture() as logs:
            lib.av_dump_format(self.ptr, 0, "", isinstance(self, OutputContainer))
        return "".join(log[2] for log in logs)

    @cython.cfunc
    def set_timeout(self, timeout):
        if timeout is None:
            self.interrupt_callback_info.timeout = -1.0
        else:
            self.interrupt_callback_info.timeout = timeout

    @cython.cfunc
    def start_timeout(self):
        self.interrupt_callback_info.start_time = time.monotonic()

    @cython.cfunc
    def _assert_open(self):
        if self.ptr == cython.NULL:
            raise AssertionError("Container is not open")

    @property
    def flags(self):
        self._assert_open()
        return self.ptr.flags

    @flags.setter
    def flags(self, value: cython.int):
        self._assert_open()
        self.ptr.flags = value

    @property
    def input_was_opened(self):
        return self._myflag & 2

    def chapters(self):
        self._assert_open()
        result: list = []
        i: cython.Py_ssize_t
        for i in range(self.ptr.nb_chapters):
            ch = self.ptr.chapters[i]
            result.append(
                {
                    "id": ch.id,
                    "start": ch.start,
                    "end": ch.end,
                    "time_base": avrational_to_fraction(cython.address(ch.time_base)),
                    "metadata": avdict_to_dict(
                        ch.metadata, self.metadata_encoding, self.metadata_errors
                    ),
                }
            )
        return result

    def set_chapters(self, chapters):
        self._assert_open()

        count: cython.Py_ssize_t = len(chapters)
        i: cython.Py_ssize_t
        ch_array: AVChapterPtrPtr
        ch: cython.pointer[lib.AVChapter]
        entry: dict

        with cython.nogil:
            _free_chapters(self.ptr)

        ch_array = cython.cast(
            AVChapterPtrPtr,
            lib.av_malloc(count * cython.sizeof(cython.pointer[lib.AVChapter])),
        )
        if ch_array == cython.NULL:
            raise MemoryError("av_malloc failed for chapters")

        for i in range(count):
            entry = chapters[i]
            ch = cython.cast(
                cython.pointer[lib.AVChapter],
                lib.av_malloc(cython.sizeof(lib.AVChapter)),
            )
            if ch == cython.NULL:
                raise MemoryError("av_malloc failed for chapter")
            ch.id = entry["id"]
            ch.start = cython.cast(int64_t, entry["start"])
            ch.end = cython.cast(int64_t, entry["end"])
            to_avrational(entry["time_base"], cython.address(ch.time_base))
            ch.metadata = cython.NULL
            if "metadata" in entry:
                dict_to_avdict(
                    cython.address(ch.metadata),
                    entry["metadata"],
                    self.metadata_encoding,
                    self.metadata_errors,
                )
            ch_array[i] = ch

        self.ptr.nb_chapters = count
        self.ptr.chapters = ch_array


def open(
    file,
    mode=None,
    format=None,
    options=None,
    container_options=None,
    stream_options=None,
    metadata_encoding="utf-8",
    metadata_errors="strict",
    buffer_size=32768,
    timeout=None,
    io_open=None,
    hwaccel=None,
):
    """open(file, mode='r', **kwargs)

    Main entrypoint to opening files/streams.

    :param str file: The file to open, which can be either a string or a file-like object.
    :param str mode: ``"r"`` for reading and ``"w"`` for writing.
    :param str format: Specific format to use. Defaults to autodect.
    :param dict options: Options to pass to the container and all streams.
    :param dict container_options: Options to pass to the container.
    :param list stream_options: Options to pass to each stream.
    :param str metadata_encoding: Encoding to use when reading or writing file metadata.
        Defaults to ``"utf-8"``.
    :param str metadata_errors: Specifies how to handle encoding errors; behaves like
        ``str.encode`` parameter. Defaults to ``"strict"``.
    :param int buffer_size: Size of buffer for Python input/output operations in bytes.
        Honored only when ``file`` is a file-like object. Defaults to 32768 (32k).
    :param timeout: How many seconds to wait for data before giving up, as a float, or a
        ``(open timeout, read timeout)`` tuple.
    :param callable io_open: Custom I/O callable for opening files/streams.
        This option is intended for formats that need to open additional
        file-like objects to ``file`` using custom I/O.
        The callable signature is ``io_open(url: str, flags: int, options: dict)``, where
        ``url`` is the url to open, ``flags`` is a combination of AVIO_FLAG_* and
        ``options`` is a dictionary of additional options. The callable should return a
        file-like object.
    :param HWAccel hwaccel: Optional settings for hardware-accelerated decoding.
    :rtype: Container

    For devices (via ``libavdevice``), pass the name of the device to ``format``,
    e.g.::

        >>> # Open webcam on MacOS.
        >>> av.open('0', format='avfoundation') # doctest: +SKIP

    For DASH and custom I/O using ``io_open``, add a protocol prefix to the ``file`` to
    prevent the DASH encoder defaulting to the file protocol and using temporary files.
    The custom I/O callable can be used to remove the protocol prefix to reveal the actual
    name for creating the file-like object. E.g.::

        >>> av.open("customprotocol://manifest.mpd", "w", io_open=custom_io) # doctest: +SKIP

    .. seealso:: :ref:`garbage_collection`

    More information on using input and output devices is available on the
    `FFmpeg website <https://www.ffmpeg.org/ffmpeg-devices.html>`_.
    """

    if not (mode is None or (isinstance(mode, str) and mode == "r" or mode == "w")):
        raise ValueError(f"mode must be 'r', 'w', or None, got: {mode}")

    if isinstance(file, str):
        pass
    elif isinstance(file, Path):
        file = f"{file}"
    elif mode is None:
        mode = getattr(file, "mode", None)

    if mode is None:
        mode = "r"

    if isinstance(timeout, tuple):
        if not len(timeout) == 2:
            raise ValueError("timeout must be `float` or `tuple[float, float]`")

        open_timeout, read_timeout = timeout
    else:
        open_timeout = timeout
        read_timeout = timeout

    if mode.startswith("r"):
        return InputContainer(
            _cinit_sentinel,
            file,
            format,
            options,
            container_options,
            stream_options,
            hwaccel,
            metadata_encoding,
            metadata_errors,
            buffer_size,
            open_timeout,
            read_timeout,
            io_open,
        )

    if stream_options:
        raise ValueError(
            "Provide stream options via Container.add_stream(..., options={})."
        )
    return OutputContainer(
        _cinit_sentinel,
        file,
        format,
        options,
        container_options,
        stream_options,
        None,
        metadata_encoding,
        metadata_errors,
        buffer_size,
        open_timeout,
        read_timeout,
        io_open,
    )
