# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

from collections import defaultdict
import asyncio
import logging
import sys
import time

from mautrix import __optional_imports__
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, AppService
from mautrix.errors import (
    DecryptionError,
    IntentError,
    MatrixError,
    MExclusive,
    MForbidden,
    MUnknownToken,
    SessionNotFound,
)
from mautrix.types import (
    BaseRoomEvent,
    BeeperMessageStatusEventContent,
    EncryptedEvent,
    Event,
    EventID,
    EventType,
    MediaRepoConfig,
    Membership,
    MemberStateEventContent,
    MessageEvent,
    MessageEventContent,
    MessageStatus,
    MessageStatusReason,
    MessageType,
    PresenceEvent,
    ReactionEvent,
    ReceiptEvent,
    ReceiptType,
    RedactionEvent,
    RelatesTo,
    RelationType,
    RoomID,
    RoomType,
    SingleReceiptEventContent,
    SpecVersions,
    StateEvent,
    StateUnsigned,
    TextMessageEventContent,
    TrustState,
    TypingEvent,
    UserID,
    Version,
    VersionsResponse,
)
from mautrix.util import background_task, markdown
from mautrix.util.logging import TraceLogger
from mautrix.util.message_send_checkpoint import (
    CHECKPOINT_TYPES,
    MessageSendCheckpoint,
    MessageSendCheckpointReportedBy,
    MessageSendCheckpointStatus,
    MessageSendCheckpointStep,
)
from mautrix.util.opt_prometheus import Histogram

from .. import bridge as br
from . import commands as cmd

encryption_import_error = None
media_encrypt_import_error = None

try:
    from .e2ee import EncryptionManager
except ImportError as e:
    if __optional_imports__:
        raise
    encryption_import_error = e
    EncryptionManager = None

try:
    from mautrix.crypto.attachments import encrypt_attachment
except ImportError as e:
    if __optional_imports__:
        raise
    media_encrypt_import_error = e
    encrypt_attachment = None

EVENT_TIME = Histogram(
    "bridge_matrix_event", "Time spent processing Matrix events", ["event_type"]
)


class UnencryptedMessageError(DecryptionError):
    def __init__(self) -> None:
        super().__init__("unencrypted message")

    @property
    def human_message(self) -> str:
        return "the message is not encrypted"


class EncryptionUnsupportedError(DecryptionError):
    def __init__(self) -> None:
        super().__init__("encryption is not supported")

    @property
    def human_message(self) -> str:
        return "the bridge is not configured to support encryption"


class DeviceUntrustedError(DecryptionError):
    def __init__(self, trust: TrustState) -> None:
        explanation = {
            TrustState.BLACKLISTED: "device is blacklisted",
            TrustState.UNVERIFIED: "unverified",
            TrustState.UNKNOWN_DEVICE: "device info not found",
            TrustState.FORWARDED: "keys were forwarded from an unknown device",
            TrustState.CROSS_SIGNED_UNTRUSTED: (
                "cross-signing keys changed after setting up the bridge"
            ),
        }.get(trust)
        base = "your device is not trusted"
        self.message = f"{base} ({explanation})" if explanation else base
        super().__init__(self.message)

    @property
    def human_message(self) -> str:
        return self.message


class BaseMatrixHandler:
    log: TraceLogger = logging.getLogger("mau.mx")
    az: AppService
    commands: cmd.CommandProcessor
    config: config.BaseBridgeConfig
    bridge: br.Bridge
    e2ee: EncryptionManager | None
    require_e2ee: bool
    media_config: MediaRepoConfig
    versions: VersionsResponse
    minimum_spec_version: Version = SpecVersions.V11
    room_locks: dict[str, asyncio.Lock]

    user_id_prefix: str
    user_id_suffix: str

    def __init__(
        self,
        command_processor: cmd.CommandProcessor | None = None,
        bridge: br.Bridge | None = None,
    ) -> None:
        self.az = bridge.az
        self.config = bridge.config
        self.bridge = bridge
        self.commands = command_processor or cmd.CommandProcessor(bridge=bridge)
        self.media_config = MediaRepoConfig(upload_size=50 * 1024 * 1024)
        self.versions = VersionsResponse.deserialize({"versions": ["v1.3"]})
        self.az.matrix_event_handler(self.int_handle_event)
        self.room_locks = defaultdict(asyncio.Lock)

        self.e2ee = None
        self.require_e2ee = False
        if self.config["bridge.encryption.allow"]:
            if not EncryptionManager:
                self.log.fatal(
                    "Encryption enabled in config, but dependencies not installed.",
                    exc_info=encryption_import_error,
                )
                sys.exit(31)
            if not encrypt_attachment:
                self.log.fatal(
                    "Encryption enabled in config, but media encryption dependencies "
                    "not installed.",
                    exc_info=media_encrypt_import_error,
                )
                sys.exit(31)
            self.e2ee = EncryptionManager(
                bridge=bridge,
                user_id_prefix=self.user_id_prefix,
                user_id_suffix=self.user_id_suffix,
                homeserver_address=self.config["homeserver.address"],
                db_url=self.config["appservice.database"],
            )
            self.require_e2ee = self.config["bridge.encryption.require"]

        self.management_room_text = self.config.get(
            "bridge.management_room_text",
            {
                "welcome": "Hello, I'm a bridge bot.",
                "welcome_connected": "Use `help` for help.",
                "welcome_unconnected": "Use `help` for help on how to log in.",
            },
        )
        self.management_room_multiple_messages = self.config.get(
            "bridge.management_room_multiple_messages",
            False,
        )

    async def check_versions(self) -> None:
        if not self.versions.supports_at_least(self.minimum_spec_version):
            self.log.fatal(
                "The homeserver is outdated "
                "(server supports Matrix %s, but the bridge requires at least %s)",
                self.versions.latest_version,
                self.minimum_spec_version,
            )
            sys.exit(18)
        if self.bridge.homeserver_software.is_hungry and not self.versions.supports(
            "com.beeper.hungry"
        ):
            self.log.fatal(
                "The config claims the homeserver is hungryserv, "
                "but the /versions response didn't confirm it"
            )
            sys.exit(18)

    async def wait_for_connection(self) -> None:
        self.log.info("Ensuring connectivity to homeserver")
        while True:
            try:
                self.versions = await self.az.intent.versions()
                break
            except MForbidden:
                self.log.debug(
                    "/versions endpoint returned M_FORBIDDEN, "
                    "trying to register bridge bot before retrying..."
                )
                await self.az.intent.ensure_registered()
            except Exception:
                self.log.exception("Connection to homeserver failed, retrying in 10 seconds")
                await asyncio.sleep(10)
        await self.check_versions()
        try:
            await self.az.intent.whoami()
        except MForbidden:
            self.log.debug(
                "Whoami endpoint returned M_FORBIDDEN, "
                "trying to register bridge bot before retrying..."
            )
            await self.az.intent.ensure_registered()
            await self.az.intent.whoami()
        if self.versions.supports("fi.mau.msc2659.stable") or self.versions.supports_at_least(
            SpecVersions.V17
        ):
            try:
                txn_id = self.az.intent.api.get_txn_id()
                duration = await self.az.ping_self(txn_id)
                self.log.debug(
                    "Homeserver->bridge connection works, "
                    f"roundtrip time is {duration} ms (txn ID: {txn_id})"
                )
            except Exception:
                self.log.exception("Error checking homeserver -> bridge connection")
                sys.exit(16)
        else:
            self.log.debug(
                "Homeserver does not support checking status of homeserver -> bridge connection"
            )
        try:
            self.media_config = await self.az.intent.get_media_repo_config()
        except Exception:
            self.log.warning("Failed to fetch media repo config", exc_info=True)

    async def init_as_bot(self) -> None:
        self.log.debug("Initializing appservice bot")
        displayname = self.config["appservice.bot_displayname"]
        if displayname:
            try:
                await self.az.intent.set_displayname(
                    displayname if displayname != "remove" else ""
                )
            except Exception:
                self.log.exception("Failed to set bot displayname")

        avatar = self.config["appservice.bot_avatar"]
        if avatar:
            try:
                await self.az.intent.set_avatar_url(avatar if avatar != "remove" else "")
            except Exception:
                self.log.exception("Failed to set bot avatar")

        if self.bridge.homeserver_software.is_hungry and self.bridge.beeper_network_name:
            self.log.debug("Setting contact info on the appservice bot")
            await self.az.intent.beeper_update_profile(
                {
                    "com.beeper.bridge.service": self.bridge.beeper_service_name,
                    "com.beeper.bridge.network": self.bridge.beeper_network_name,
                    "com.beeper.bridge.is_bridge_bot": True,
                }
            )

    async def init_encryption(self) -> None:
        if self.e2ee:
            await self.e2ee.start()

    async def allow_message(self, user: br.BaseUser) -> bool:
        return user.is_whitelisted or (
            self.config["bridge.relay.enabled"] and user.relay_whitelisted
        )

    @staticmethod
    async def allow_command(user: br.BaseUser) -> bool:
        return user.is_whitelisted

    @staticmethod
    async def allow_bridging_message(user: br.BaseUser, portal: br.BasePortal) -> bool:
        return await user.is_logged_in() or (user.relay_whitelisted and portal.has_relay)

    @staticmethod
    async def allow_puppet_invite(user: br.BaseUser, puppet: br.BasePuppet) -> bool:
        return await user.is_logged_in()

    async def handle_leave(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
        pass

    async def handle_kick(
        self, room_id: RoomID, user_id: UserID, kicked_by: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_ban(
        self, room_id: RoomID, user_id: UserID, banned_by: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_unban(
        self, room_id: RoomID, user_id: UserID, unbanned_by: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_join(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
        pass

    async def handle_knock(
        self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_retract_knock(
        self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_reject_knock(
        self, room_id: RoomID, user_id: UserID, sender: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_accept_knock(
        self, room_id: RoomID, user_id: UserID, sender: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_member_info_change(
        self,
        room_id: RoomID,
        user_id: UserID,
        content: MemberStateEventContent,
        prev_content: MemberStateEventContent,
        event_id: EventID,
    ) -> None:
        pass

    async def handle_puppet_group_invite(
        self,
        room_id: RoomID,
        puppet: br.BasePuppet,
        invited_by: br.BaseUser,
        evt: StateEvent,
        members: list[UserID],
    ) -> None:
        if self.az.bot_mxid not in members:
            await puppet.default_mxid_intent.leave_room(
                room_id, reason="This ghost does not join multi-user rooms without the bridge bot."
            )

    async def handle_puppet_dm_invite(
        self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
    ) -> None:
        portal = await invited_by.get_portal_with(puppet)
        if portal:
            await portal.accept_matrix_dm(room_id, invited_by, puppet)
        else:
            await puppet.default_mxid_intent.leave_room(
                room_id, reason="This bridge does not support creating DMs."
            )

    async def handle_puppet_space_invite(
        self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
    ) -> None:
        await puppet.default_mxid_intent.leave_room(
            room_id, reason="This ghost does not join spaces."
        )

    async def handle_puppet_nonportal_invite(
        self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
    ) -> None:
        intent = puppet.default_mxid_intent
        await intent.join_room(room_id)
        try:
            create_evt = await intent.get_state_event(room_id, EventType.ROOM_CREATE)
            members = await intent.get_room_members(room_id)
        except MatrixError:
            self.log.exception(f"Failed to get state after joining {room_id} as {intent.mxid}")
            background_task.create(intent.leave_room(room_id, reason="Internal error"))
            return
        if create_evt.type == RoomType.SPACE:
            await self.handle_puppet_space_invite(room_id, puppet, invited_by, evt)
        elif len(members) > 2 or not evt.content.is_direct:
            await self.handle_puppet_group_invite(room_id, puppet, invited_by, evt, members)
        else:
            await self.handle_puppet_dm_invite(room_id, puppet, invited_by, evt)

    async def handle_puppet_invite(
        self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
    ) -> None:
        intent = puppet.default_mxid_intent
        if not await self.allow_puppet_invite(invited_by, puppet):
            self.log.debug(f"Rejecting invite for {intent.mxid} to {room_id}: user can't invite")
            await intent.leave_room(room_id, reason="You're not allowed to invite this ghost.")
            return

        async with self.room_locks[room_id]:
            portal = await self.bridge.get_portal(room_id)
            if portal:
                try:
                    await portal.handle_matrix_invite(invited_by, puppet)
                except br.RejectMatrixInvite as e:
                    await intent.leave_room(room_id, reason=e.message)
                except br.IgnoreMatrixInvite:
                    pass
                else:
                    await intent.join_room(room_id)
                return
            else:
                await self.handle_puppet_nonportal_invite(room_id, puppet, invited_by, evt)

    async def handle_invite(
        self, room_id: RoomID, user_id: UserID, invited_by: br.BaseUser, evt: StateEvent
    ) -> None:
        pass

    async def handle_reject(
        self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
    ) -> None:
        pass

    async def handle_disinvite(
        self,
        room_id: RoomID,
        user_id: UserID,
        disinvited_by: UserID,
        reason: str,
        event_id: EventID,
    ) -> None:
        pass

    async def handle_event(self, evt: Event) -> None:
        """
        Called by :meth:`int_handle_event` for message events other than m.room.message.

        **N.B.** You may need to add the event class to :attr:`allowed_event_classes`
        or override :meth:`allow_matrix_event` for it to reach here.
        """

    async def handle_state_event(self, evt: StateEvent) -> None:
        """
        Called by :meth:`int_handle_event` for state events other than m.room.membership.

        **N.B.** You may need to add the event class to :attr:`allowed_event_classes`
        or override :meth:`allow_matrix_event` for it to reach here.
        """

    async def handle_ephemeral_event(
        self, evt: ReceiptEvent | PresenceEvent | TypingEvent
    ) -> None:
        if evt.type == EventType.RECEIPT:
            await self.handle_receipt(evt)

    async def send_permission_error(self, room_id: RoomID) -> None:
        await self.az.intent.send_notice(
            room_id,
            text=(
                "You are not whitelisted to use this bridge.\n\n"
                "If you are the owner of this bridge, see the bridge.permissions "
                "section in your config file."
            ),
            html=(
                "<p>You are not whitelisted to use this bridge.</p>"
                "<p>If you are the owner of this bridge, see the "
                "<code>bridge.permissions</code> section in your config file.</p>"
            ),
        )

    async def accept_bot_invite(self, room_id: RoomID, inviter: br.BaseUser) -> None:
        try:
            await self.az.intent.join_room(room_id)
        except Exception:
            self.log.exception(f"Failed to join room {room_id} as bridge bot")
            return

        if not await self.allow_command(inviter):
            await self.send_permission_error(room_id)
            await self.az.intent.leave_room(room_id)
            return

        await self.send_welcome_message(room_id, inviter)

    async def send_welcome_message(self, room_id: RoomID, inviter: br.BaseUser) -> None:
        has_two_members, bridge_bot_in_room = await self._is_direct_chat(room_id)
        is_management = has_two_members and bridge_bot_in_room

        welcome_messages = [self.management_room_text.get("welcome")]

        if is_management:
            if await inviter.is_logged_in():
                welcome_messages.append(self.management_room_text.get("welcome_connected"))
            else:
                welcome_messages.append(self.management_room_text.get("welcome_unconnected"))

            additional_help = self.management_room_text.get("additional_help")
            if additional_help:
                welcome_messages.append(additional_help)
        else:
            cmd_prefix = self.commands.command_prefix
            welcome_messages.append(f"Use `{cmd_prefix} help` for help.")

        if self.management_room_multiple_messages:
            for m in welcome_messages:
                await self.az.intent.send_notice(room_id, text=m, html=markdown.render(m))
        else:
            combined = "\n".join(welcome_messages)
            combined_html = "".join(map(markdown.render, welcome_messages))
            await self.az.intent.send_notice(room_id, text=combined, html=combined_html)

    async def int_handle_invite(self, evt: StateEvent) -> None:
        self.log.debug(f"{evt.sender} invited {evt.state_key} to {evt.room_id}")
        inviter = await self.bridge.get_user(evt.sender)
        if inviter is None:
            self.log.exception(f"Failed to find user with Matrix ID {evt.sender}")
            return
        elif evt.state_key == self.az.bot_mxid:
            await self.accept_bot_invite(evt.room_id, inviter)
            return

        puppet = await self.bridge.get_puppet(UserID(evt.state_key))
        if puppet:
            await self.handle_puppet_invite(evt.room_id, puppet, inviter, evt)
            return

        await self.handle_invite(evt.room_id, UserID(evt.state_key), inviter, evt)

    def is_command(self, message: MessageEventContent) -> tuple[bool, str]:
        text = message.body
        prefix = self.config["bridge.command_prefix"]
        is_command = text.startswith(prefix)
        if is_command:
            text = text[len(prefix) + 1 :].lstrip()
        return is_command, text

    async def _send_mss(
        self,
        evt: Event,
        status: MessageStatus,
        reason: MessageStatusReason | None = None,
        error: str | None = None,
        message: str | None = None,
    ) -> None:
        if not self.config.get("bridge.message_status_events", False):
            return
        status_content = BeeperMessageStatusEventContent(
            network="",  # TODO set network properly
            relates_to=RelatesTo(rel_type=RelationType.REFERENCE, event_id=evt.event_id),
            status=status,
            reason=reason,
            error=error,
            message=message,
        )
        await self.az.intent.send_message_event(
            evt.room_id, EventType.BEEPER_MESSAGE_STATUS, status_content
        )

    async def _send_crypto_status_error(
        self,
        evt: Event,
        err: DecryptionError | None = None,
        retry_num: int = 0,
        is_final: bool = True,
        edit: EventID | None = None,
        wait_for: int | None = None,
    ) -> EventID | None:
        msg = str(err)
        if isinstance(err, (SessionNotFound, UnencryptedMessageError)):
            msg = err.human_message
        self._send_message_checkpoint(
            evt, MessageSendCheckpointStep.DECRYPTED, msg, permanent=is_final, retry_num=retry_num
        )

        if wait_for:
            msg += f". The bridge will retry for {wait_for} seconds"
        full_msg = f"\u26a0 Your message was not bridged: {msg}."
        if isinstance(err, EncryptionUnsupportedError):
            full_msg = "🔒️ This bridge has not been configured to support encryption"
        event_id = None
        if self.config.get("bridge.delivery_error_reports", True):
            try:
                content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=full_msg)
                if edit:
                    content.set_edit(edit)
                event_id = await self.az.intent.send_message(evt.room_id, content)
            except IntentError:
                self.log.debug("IntentError while sending encryption error", exc_info=True)
                self.log.error(
                    "Got IntentError while trying to send encryption error message. "
                    "This likely means the bridge bot is not in the room, which can "
                    "happen if you force-enable e2ee on the homeserver without enabling "
                    "it by default on the bridge (bridge -> encryption -> default)."
                )

        await self._send_mss(
            evt,
            status=MessageStatus.RETRIABLE if is_final else MessageStatus.PENDING,
            reason=MessageStatusReason.UNDECRYPTABLE,
            error=str(err),
            message=err.human_message if err else None,
        )

        return event_id

    async def handle_message(self, evt: MessageEvent, was_encrypted: bool = False) -> None:
        room_id = evt.room_id
        user_id = evt.sender
        event_id = evt.event_id
        message = evt.content

        if not was_encrypted and self.require_e2ee:
            self.log.warning(f"Dropping {event_id} from {user_id} as it's not encrypted!")
            await self._send_crypto_status_error(evt, UnencryptedMessageError(), 0)
            return

        sender = await self.bridge.get_user(user_id)
        if not sender or not await self.allow_message(sender):
            self.log.debug(
                f"Ignoring message {event_id} from {user_id} to {room_id}:"
                " user is not whitelisted."
            )
            self._send_message_checkpoint(
                evt, MessageSendCheckpointStep.BRIDGE, "user is not whitelisted"
            )
            return
        self.log.debug(f"Received Matrix event {event_id} from {sender.mxid} in {room_id}")
        self.log.trace("Event %s content: %s", event_id, message)

        if isinstance(message, TextMessageEventContent):
            message.trim_reply_fallback()

        is_command, text = self.is_command(message)
        portal = await self.bridge.get_portal(room_id)
        if not is_command and portal:
            if await self.allow_bridging_message(sender, portal):
                await portal.handle_matrix_message(sender, message, event_id)
            else:
                self.log.debug(
                    f"Ignoring event {event_id} from {sender.mxid}:"
                    " not allowed to send to portal"
                )
                self._send_message_checkpoint(
                    evt,
                    MessageSendCheckpointStep.BRIDGE,
                    "user is not allowed to send to the portal",
                )
            return

        if message.msgtype != MessageType.TEXT:
            self.log.debug(
                f"Ignoring event {event_id}: not a portal room and not a m.text message"
            )
            self._send_message_checkpoint(
                evt, MessageSendCheckpointStep.BRIDGE, "not a portal room and not a m.text message"
            )
            return
        elif not await self.allow_command(sender):
            self.log.debug(
                f"Ignoring command {event_id} from {sender.mxid}: not allowed to run commands"
            )
            self._send_message_checkpoint(
                evt, MessageSendCheckpointStep.COMMAND, "not allowed to run commands"
            )
            return

        has_two_members, bridge_bot_in_room = await self._is_direct_chat(room_id)
        is_management = has_two_members and bridge_bot_in_room

        if is_command or is_management:
            try:
                command, arguments = text.split(" ", 1)
                args = arguments.split(" ")
            except ValueError:
                # Not enough values to unpack, i.e. no arguments
                command = text
                args = []

            try:
                await self.commands.handle(
                    room_id,
                    event_id,
                    sender,
                    command,
                    args,
                    message,
                    portal,
                    is_management,
                    bridge_bot_in_room,
                )
            except Exception as e:
                self.log.debug(f"Error handling command {command} from {sender}: {e}")
                self._send_message_checkpoint(evt, MessageSendCheckpointStep.COMMAND, e)
                await self._send_mss(
                    evt,
                    status=MessageStatus.FAIL,
                    reason=MessageStatusReason.GENERIC_ERROR,
                    error="",
                    message="Command execution failed",
                )
            else:
                await MessageSendCheckpoint(
                    event_id=event_id,
                    room_id=room_id,
                    step=MessageSendCheckpointStep.COMMAND,
                    timestamp=int(time.time() * 1000),
                    status=MessageSendCheckpointStatus.SUCCESS,
                    reported_by=MessageSendCheckpointReportedBy.BRIDGE,
                    event_type=EventType.ROOM_MESSAGE,
                    message_type=message.msgtype,
                ).send(
                    self.bridge.config["homeserver.message_send_checkpoint_endpoint"],
                    self.az.as_token,
                    self.log,
                )
                await self._send_mss(evt, status=MessageStatus.SUCCESS)
        else:
            self.log.debug(
                f"Ignoring event {event_id} from {sender.mxid}:"
                " not a command and not a portal room"
            )
            self._send_message_checkpoint(
                evt, MessageSendCheckpointStep.COMMAND, "not a command and not a portal room"
            )
            await self._send_mss(
                evt,
                status=MessageStatus.FAIL,
                reason=MessageStatusReason.UNSUPPORTED,
                error="Unknown room",
                message="Unknown room",
            )

    async def _is_direct_chat(self, room_id: RoomID) -> tuple[bool, bool]:
        try:
            members = await self.az.intent.get_room_members(room_id)
            return len(members) == 2, self.az.bot_mxid in members
        except MatrixError:
            return False, False

    async def handle_receipt(self, evt: ReceiptEvent) -> None:
        for event_id, receipts in evt.content.items():
            for user_id, data in receipts.get(ReceiptType.READ, {}).items():
                user = await self.bridge.get_user(user_id, create=False)
                if not user or not await user.is_logged_in():
                    continue

                portal = await self.bridge.get_portal(evt.room_id)
                if not portal:
                    continue

                await portal.schedule_disappearing()

                if (
                    data.get(DOUBLE_PUPPET_SOURCE_KEY) == self.az.bridge_name
                    and await self.bridge.get_double_puppet(user_id) is not None
                ):
                    continue

                await self.handle_read_receipt(user, portal, event_id, data)

    async def handle_read_receipt(
        self,
        user: br.BaseUser,
        portal: br.BasePortal,
        event_id: EventID,
        data: SingleReceiptEventContent,
    ) -> None:
        pass

    async def try_handle_sync_event(self, evt: Event) -> None:
        try:
            if isinstance(evt, (ReceiptEvent, PresenceEvent, TypingEvent)):
                await self.handle_ephemeral_event(evt)
            else:
                self.log.trace("Unknown event type received from sync: %s", evt)
        except Exception:
            self.log.exception("Error handling manually received Matrix event")

    async def _post_decrypt(
        self, evt: Event, retry_num: int = 0, error_event_id: EventID | None = None
    ) -> None:
        trust_state = evt["mautrix"]["trust_state"]
        if trust_state < self.e2ee.min_send_trust:
            self.log.warning(
                f"Dropping {evt.event_id} from {evt.sender} due to insufficient verification level"
                f" (event: {trust_state}, required: {self.e2ee.min_send_trust})"
            )
            await self._send_crypto_status_error(
                evt,
                retry_num=retry_num,
                err=DeviceUntrustedError(trust_state),
                edit=error_event_id,
            )
            return

        self._send_message_checkpoint(
            evt, MessageSendCheckpointStep.DECRYPTED, retry_num=retry_num
        )
        if error_event_id:
            await self.az.intent.redact(evt.room_id, error_event_id)
        await self.int_handle_event(evt, was_encrypted=True)

    async def handle_encrypted(self, evt: EncryptedEvent) -> None:
        if not self.e2ee:
            self.log.debug(
                "Got encrypted message %s from %s, but encryption is not enabled",
                evt.event_id,
                evt.sender,
            )
            await self._send_crypto_status_error(evt, EncryptionUnsupportedError())
            return
        try:
            decrypted = await self.e2ee.decrypt(evt, wait_session_timeout=3)
        except SessionNotFound as e:
            await self._handle_encrypted_wait(evt, e, wait=22)
        except DecryptionError as e:
            self.log.warning(f"Failed to decrypt {evt.event_id}: {e}")
            self.log.trace("%s decryption traceback:", evt.event_id, exc_info=True)
            await self._send_crypto_status_error(evt, e)
        else:
            await self._post_decrypt(decrypted)

    async def _handle_encrypted_wait(
        self, evt: EncryptedEvent, err: SessionNotFound, wait: int
    ) -> None:
        self.log.debug(
            f"Couldn't find session {err.session_id} trying to decrypt {evt.event_id},"
            " waiting even longer"
        )
        background_task.create(
            self.e2ee.crypto.request_room_key(
                evt.room_id,
                evt.content.sender_key,
                evt.content.session_id,
                from_devices={evt.sender: [evt.content.device_id]},
            )
        )
        event_id = await self._send_crypto_status_error(evt, err, is_final=False, wait_for=wait)
        got_keys = await self.e2ee.crypto.wait_for_session(
            evt.room_id, err.session_id, timeout=wait
        )
        if got_keys:
            self.log.debug(
                f"Got session {err.session_id} after waiting more, "
                f"trying to decrypt {evt.event_id} again"
            )
            try:
                decrypted = await self.e2ee.decrypt(evt, wait_session_timeout=0)
            except DecryptionError as e:
                await self._send_crypto_status_error(evt, e, retry_num=1, edit=event_id)
                self.log.warning(f"Failed to decrypt {evt.event_id}: {e}")
                self.log.trace("%s decryption traceback:", evt.event_id, exc_info=True)
            else:
                await self._post_decrypt(decrypted, retry_num=1, error_event_id=event_id)
                return
        else:
            self.log.warning(f"Didn't get {err.session_id}, giving up on {evt.event_id}")
            await self._send_crypto_status_error(
                evt, SessionNotFound(err.session_id), retry_num=1, edit=event_id
            )

    async def handle_encryption(self, evt: StateEvent) -> None:
        await self.az.state_store.set_encryption_info(evt.room_id, evt.content)
        portal = await self.bridge.get_portal(evt.room_id)
        if portal:
            portal.encrypted = True
            await portal.save()
            if portal.is_direct:
                portal.log.debug("Received encryption event in direct portal: %s", evt.content)
                await portal.enable_dm_encryption()

    def _send_message_checkpoint(
        self,
        evt: Event,
        step: MessageSendCheckpointStep,
        err: Exception | str | None = None,
        permanent: bool = True,
        retry_num: int = 0,
    ) -> None:
        endpoint = self.bridge.config["homeserver.message_send_checkpoint_endpoint"]
        if not endpoint:
            return
        if evt.type not in CHECKPOINT_TYPES:
            return

        self.log.debug(f"Sending message send checkpoint for {evt.event_id} (step: {step})")
        status = MessageSendCheckpointStatus.SUCCESS
        if err:
            status = (
                MessageSendCheckpointStatus.PERM_FAILURE
                if permanent
                else MessageSendCheckpointStatus.WILL_RETRY
            )

        checkpoint = MessageSendCheckpoint(
            event_id=evt.event_id,
            room_id=evt.room_id,
            step=step,
            timestamp=int(time.time() * 1000),
            status=status,
            reported_by=MessageSendCheckpointReportedBy.BRIDGE,
            event_type=evt.type,
            message_type=evt.content.msgtype if evt.type == EventType.ROOM_MESSAGE else None,
            info=str(err) if err else None,
            retry_num=retry_num,
        )
        background_task.create(checkpoint.send(endpoint, self.az.as_token, self.log))

    allowed_event_classes: tuple[type, ...] = (
        MessageEvent,
        StateEvent,
        ReactionEvent,
        EncryptedEvent,
        RedactionEvent,
        ReceiptEvent,
        TypingEvent,
        PresenceEvent,
    )

    async def allow_matrix_event(self, evt: Event) -> bool:
        # If the event is not one of the allowed classes, ignore it.
        if not isinstance(evt, self.allowed_event_classes):
            return False
        # For room events, make sure the message didn't originate from the bridge.
        if isinstance(evt, BaseRoomEvent):
            # If the event is from a bridge ghost, ignore it.
            if evt.sender == self.az.bot_mxid or self.bridge.is_bridge_ghost(evt.sender):
                return False
            # If the event is marked as double puppeted and we can confirm that we are in fact
            # double puppeting that user ID, ignore it.
            if (
                evt.content.get(DOUBLE_PUPPET_SOURCE_KEY) == self.az.bridge_name
                and await self.bridge.get_double_puppet(evt.sender) is not None
            ):
                return False
        # For non-room events and non-bridge-originated room events, allow.
        return True

    async def int_handle_event(self, evt: Event, was_encrypted: bool = False) -> None:
        if isinstance(evt, StateEvent) and evt.type == EventType.ROOM_MEMBER and self.e2ee:
            await self.e2ee.handle_member_event(evt)
        if not await self.allow_matrix_event(evt):
            return
        self.log.trace("Received event: %s", evt)

        if not was_encrypted:
            self._send_message_checkpoint(evt, MessageSendCheckpointStep.BRIDGE)
        start_time = time.time()

        if evt.type == EventType.ROOM_MEMBER:
            evt: StateEvent
            unsigned = evt.unsigned or StateUnsigned()
            prev_content = unsigned.prev_content or MemberStateEventContent()
            prev_membership = prev_content.membership if prev_content else Membership.JOIN
            if evt.content.membership == Membership.INVITE:
                if prev_membership == Membership.KNOCK:
                    await self.handle_accept_knock(
                        evt.room_id,
                        UserID(evt.state_key),
                        evt.sender,
                        evt.content.reason,
                        evt.event_id,
                    )
                else:
                    await self.int_handle_invite(evt)
            elif evt.content.membership == Membership.LEAVE:
                if prev_membership == Membership.BAN:
                    await self.handle_unban(
                        evt.room_id,
                        UserID(evt.state_key),
                        evt.sender,
                        evt.content.reason,
                        evt.event_id,
                    )
                elif prev_membership == Membership.INVITE:
                    if evt.sender == evt.state_key:
                        await self.handle_reject(
                            evt.room_id, UserID(evt.state_key), evt.content.reason, evt.event_id
                        )
                    else:
                        await self.handle_disinvite(
                            evt.room_id,
                            UserID(evt.state_key),
                            evt.sender,
                            evt.content.reason,
                            evt.event_id,
                        )
                elif prev_membership == Membership.KNOCK:
                    if evt.sender == evt.state_key:
                        await self.handle_retract_knock(
                            evt.room_id, UserID(evt.state_key), evt.content.reason, evt.event_id
                        )
                    else:
                        await self.handle_reject_knock(
                            evt.room_id,
                            UserID(evt.state_key),
                            evt.sender,
                            evt.content.reason,
                            evt.event_id,
                        )

                elif evt.sender == evt.state_key:
                    await self.handle_leave(evt.room_id, UserID(evt.state_key), evt.event_id)
                else:
                    await self.handle_kick(
                        evt.room_id,
                        UserID(evt.state_key),
                        evt.sender,
                        evt.content.reason,
                        evt.event_id,
                    )
            elif evt.content.membership == Membership.BAN:
                await self.handle_ban(
                    evt.room_id,
                    UserID(evt.state_key),
                    evt.sender,
                    evt.content.reason,
                    evt.event_id,
                )
            elif evt.content.membership == Membership.JOIN:
                if prev_membership != Membership.JOIN:
                    await self.handle_join(evt.room_id, UserID(evt.state_key), evt.event_id)
                else:
                    await self.handle_member_info_change(
                        evt.room_id, UserID(evt.state_key), evt.content, prev_content, evt.event_id
                    )
            elif evt.content.membership == Membership.KNOCK:
                await self.handle_knock(
                    evt.room_id,
                    UserID(evt.state_key),
                    evt.content.reason,
                    evt.event_id,
                )
        elif evt.type in (EventType.ROOM_MESSAGE, EventType.STICKER):
            evt: MessageEvent
            if evt.type != EventType.ROOM_MESSAGE:
                evt.content.msgtype = MessageType(str(evt.type))
            await self.handle_message(evt, was_encrypted=was_encrypted)
        elif evt.type == EventType.ROOM_ENCRYPTED:
            await self.handle_encrypted(evt)
        elif evt.type == EventType.ROOM_ENCRYPTION:
            await self.handle_encryption(evt)
        else:
            if evt.type.is_state and isinstance(evt, StateEvent):
                await self.handle_state_event(evt)
            elif evt.type.is_ephemeral and isinstance(
                evt, (PresenceEvent, TypingEvent, ReceiptEvent)
            ):
                await self.handle_ephemeral_event(evt)
            else:
                await self.handle_event(evt)

        await self.log_event_handle_duration(evt, time.time() - start_time)

    async def log_event_handle_duration(self, evt: Event, duration: float) -> None:
        EVENT_TIME.labels(event_type=str(evt.type)).observe(duration)
