Source code for discord.webhook.async_

"""
The MIT License (MIT)

Copyright (c) 2015-2021 Rapptz
Copyright (c) 2021-present Pycord Development

Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""

from __future__ import annotations

import asyncio
import json
import logging
import re
import weakref
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, overload
from urllib.parse import quote as urlquote

import aiohttp

from .. import utils
from ..asset import Asset
from ..channel import ForumChannel, PartialMessageable
from ..enums import WebhookType, try_enum
from ..errors import (
    DiscordServerError,
    Forbidden,
    HTTPException,
    InvalidArgument,
    NotFound,
)
from ..flags import MessageFlags
from ..http import Route
from ..message import Attachment, Message
from ..mixins import Hashable
from ..object import Object
from ..threads import Thread
from ..user import BaseUser, User

__all__ = (
    "Webhook",
    "WebhookMessage",
    "PartialWebhookChannel",
    "PartialWebhookGuild",
)

_log = logging.getLogger(__name__)

if TYPE_CHECKING:
    import datetime

    from ..abc import Snowflake
    from ..channel import TextChannel
    from ..embeds import Embed
    from ..file import File
    from ..guild import Guild
    from ..http import Response
    from ..mentions import AllowedMentions
    from ..poll import Poll
    from ..state import ConnectionState
    from ..types.message import Message as MessagePayload
    from ..types.webhook import FollowerWebhook as FollowerWebhookPayload
    from ..types.webhook import Webhook as WebhookPayload
    from ..ui.view import View

MISSING = utils.MISSING


class AsyncDeferredLock:
    def __init__(self, lock: asyncio.Lock):
        self.lock = lock
        self.delta: float | None = None

    async def __aenter__(self):
        await self.lock.acquire()
        return self

    def delay_by(self, delta: float) -> None:
        self.delta = delta

    async def __aexit__(self, type, value, traceback):
        if self.delta:
            await asyncio.sleep(self.delta)
        self.lock.release()


class AsyncWebhookAdapter:
    def __init__(self):
        self._locks: weakref.WeakValueDictionary = weakref.WeakValueDictionary()

    async def request(
        self,
        route: Route,
        session: aiohttp.ClientSession,
        *,
        payload: dict[str, Any] | None = None,
        multipart: list[dict[str, Any]] | None = None,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        files: list[File] | None = None,
        reason: str | None = None,
        auth_token: str | None = None,
        params: dict[str, Any] | None = None,
    ) -> Any:
        headers: dict[str, str] = {}
        files = files or []
        to_send: str | aiohttp.FormData | None = None
        bucket = (route.webhook_id, route.webhook_token)

        try:
            lock = self._locks[bucket]
        except KeyError:
            self._locks[bucket] = lock = asyncio.Lock()

        if payload is not None:
            headers["Content-Type"] = "application/json"
            to_send = utils._to_json(payload)

        if auth_token is not None:
            headers["Authorization"] = f"Bot {auth_token}"

        if reason is not None:
            headers["X-Audit-Log-Reason"] = urlquote(reason, safe="/ ")

        response: aiohttp.ClientResponse | None = None
        data: dict[str, Any] | str | None = None
        method = route.method
        url = route.url
        webhook_id = route.webhook_id

        async with AsyncDeferredLock(lock) as lock:
            for attempt in range(5):
                for file in files:
                    file.reset(seek=attempt)

                if multipart:
                    form_data = aiohttp.FormData(quote_fields=False)
                    for p in multipart:
                        form_data.add_field(**p)
                    to_send = form_data
                try:
                    async with session.request(
                        method,
                        url,
                        data=to_send,
                        headers=headers,
                        params=params,
                        proxy=proxy,
                        proxy_auth=proxy_auth,
                    ) as response:
                        _log.debug(
                            "Webhook ID %s with %s %s has returned status code %s",
                            webhook_id,
                            method,
                            url,
                            response.status,
                        )
                        data = (await response.text(encoding="utf-8")) or None
                        if (
                            data
                            and response.headers["Content-Type"] == "application/json"
                        ):
                            data = json.loads(data)

                        remaining = response.headers.get("X-Ratelimit-Remaining")
                        if remaining == "0" and response.status != 429:
                            delta = utils._parse_ratelimit_header(response)
                            _log.debug(
                                (
                                    "Webhook ID %s has been pre-emptively rate limited,"
                                    " waiting %.2f seconds"
                                ),
                                webhook_id,
                                delta,
                            )
                            lock.delay_by(delta)

                        if 300 > response.status >= 200:
                            return data

                        if response.status == 429:
                            if not response.headers.get("Via"):
                                raise HTTPException(response, data)

                            retry_after: float = data["retry_after"]  # type: ignore
                            _log.warning(
                                (
                                    "Webhook ID %s is rate limited. Retrying in %.2f"
                                    " seconds"
                                ),
                                webhook_id,
                                retry_after,
                            )
                            await asyncio.sleep(retry_after)
                            continue

                        if response.status >= 500:
                            await asyncio.sleep(1 + attempt * 2)
                            continue

                        if response.status == 403:
                            raise Forbidden(response, data)
                        elif response.status == 404:
                            raise NotFound(response, data)
                        else:
                            raise HTTPException(response, data)

                except OSError as e:
                    if attempt < 4 and e.errno in (54, 10054):
                        await asyncio.sleep(1 + attempt * 2)
                        continue
                    raise

            if response:
                if response.status >= 500:
                    raise DiscordServerError(response, data)
                raise HTTPException(response, data)

            raise RuntimeError("Unreachable code in HTTP handling.")

    def delete_webhook(
        self,
        webhook_id: int,
        *,
        token: str | None = None,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        reason: str | None = None,
    ) -> Response[None]:
        route = Route("DELETE", "/webhooks/{webhook_id}", webhook_id=webhook_id)
        return self.request(
            route,
            session=session,
            proxy=proxy,
            proxy_auth=proxy_auth,
            reason=reason,
            auth_token=token,
        )

    def delete_webhook_with_token(
        self,
        webhook_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        reason: str | None = None,
    ) -> Response[None]:
        route = Route(
            "DELETE",
            "/webhooks/{webhook_id}/{webhook_token}",
            webhook_id=webhook_id,
            webhook_token=token,
        )
        return self.request(
            route, session=session, proxy=proxy, proxy_auth=proxy_auth, reason=reason
        )

    def edit_webhook(
        self,
        webhook_id: int,
        token: str,
        payload: dict[str, Any],
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        reason: str | None = None,
    ) -> Response[WebhookPayload]:
        route = Route("PATCH", "/webhooks/{webhook_id}", webhook_id=webhook_id)
        return self.request(
            route,
            session=session,
            proxy=proxy,
            proxy_auth=proxy_auth,
            reason=reason,
            payload=payload,
            auth_token=token,
        )

    def edit_webhook_with_token(
        self,
        webhook_id: int,
        token: str,
        payload: dict[str, Any],
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        reason: str | None = None,
    ) -> Response[WebhookPayload]:
        route = Route(
            "PATCH",
            "/webhooks/{webhook_id}/{webhook_token}",
            webhook_id=webhook_id,
            webhook_token=token,
        )
        return self.request(
            route,
            session=session,
            proxy=proxy,
            proxy_auth=proxy_auth,
            reason=reason,
            payload=payload,
        )

    def execute_webhook(
        self,
        webhook_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        payload: dict[str, Any] | None = None,
        multipart: list[dict[str, Any]] | None = None,
        files: list[File] | None = None,
        thread_id: int | None = None,
        thread_name: str | None = None,
        wait: bool = False,
    ) -> Response[MessagePayload | None]:
        params = {"wait": int(wait)}
        if thread_id:
            params["thread_id"] = thread_id

        if thread_name:
            payload["thread_name"] = thread_name

        route = Route(
            "POST",
            "/webhooks/{webhook_id}/{webhook_token}",
            webhook_id=webhook_id,
            webhook_token=token,
        )
        return self.request(
            route,
            session,
            payload=payload,
            proxy=proxy,
            proxy_auth=proxy_auth,
            multipart=multipart,
            files=files,
            params=params,
        )

    def get_webhook_message(
        self,
        webhook_id: int,
        token: str,
        message_id: int,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        thread_id: int | None = None,
    ) -> Response[MessagePayload]:
        params = {}

        if thread_id:
            params["thread_id"] = thread_id

        route = Route(
            "GET",
            "/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}",
            webhook_id=webhook_id,
            webhook_token=token,
            message_id=message_id,
        )
        return self.request(
            route, session=session, proxy=proxy, proxy_auth=proxy_auth, params=params
        )

    def edit_webhook_message(
        self,
        webhook_id: int,
        token: str,
        message_id: int,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        thread_id: int | None = None,
        payload: dict[str, Any] | None = None,
        multipart: list[dict[str, Any]] | None = None,
        files: list[File] | None = None,
    ) -> Response[WebhookMessage]:
        params = {}

        if thread_id:
            params["thread_id"] = thread_id

        route = Route(
            "PATCH",
            "/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}",
            webhook_id=webhook_id,
            webhook_token=token,
            message_id=message_id,
        )
        return self.request(
            route,
            session,
            params=params,
            payload=payload,
            proxy=proxy,
            proxy_auth=proxy_auth,
            multipart=multipart,
            files=files,
        )

    def delete_webhook_message(
        self,
        webhook_id: int,
        token: str,
        message_id: int,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        thread_id: int | None = None,
    ) -> Response[None]:
        params = {}

        if thread_id:
            params["thread_id"] = thread_id

        route = Route(
            "DELETE",
            "/webhooks/{webhook_id}/{webhook_token}/messages/{message_id}",
            webhook_id=webhook_id,
            webhook_token=token,
            message_id=message_id,
        )
        return self.request(
            route, session=session, proxy=proxy, proxy_auth=proxy_auth, params=params
        )

    def fetch_webhook(
        self,
        webhook_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
    ) -> Response[WebhookPayload | FollowerWebhookPayload]:
        route = Route("GET", "/webhooks/{webhook_id}", webhook_id=webhook_id)
        return self.request(
            route, session=session, proxy=proxy, proxy_auth=proxy_auth, auth_token=token
        )

    def fetch_webhook_with_token(
        self,
        webhook_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
    ) -> Response[WebhookPayload | FollowerWebhookPayload]:
        route = Route(
            "GET",
            "/webhooks/{webhook_id}/{webhook_token}",
            webhook_id=webhook_id,
            webhook_token=token,
        )
        return self.request(route, session=session, proxy=proxy, proxy_auth=proxy_auth)

    def create_interaction_response(
        self,
        interaction_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        type: int,
        data: dict[str, Any] | None = None,
        files: list[File] = None,
    ) -> Response[None]:
        payload: dict[str, Any] = {
            "type": type,
        }

        if data is not None:
            payload["data"] = data
        form = [{"name": "payload_json"}]
        attachments = []
        files = files or []
        for index, file in enumerate(files):
            attachments.append(
                {
                    "id": index,
                    "filename": file.filename,
                    "description": file.description,
                }
            )
            form.append(
                {
                    "name": f"files[{index}]",
                    "value": file.fp,
                    "filename": file.filename,
                    "content_type": "application/octet-stream",
                }
            )
        payload["attachments"] = attachments
        form[0]["value"] = utils._to_json(payload)

        route = Route(
            "POST",
            "/interactions/{webhook_id}/{webhook_token}/callback",
            webhook_id=interaction_id,
            webhook_token=token,
        )

        return self.request(
            route,
            session=session,
            proxy=proxy,
            proxy_auth=proxy_auth,
            files=files,
            multipart=form,
        )

    def get_original_interaction_response(
        self,
        application_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
    ) -> Response[MessagePayload]:
        r = Route(
            "GET",
            "/webhooks/{webhook_id}/{webhook_token}/messages/@original",
            webhook_id=application_id,
            webhook_token=token,
        )
        return self.request(r, session=session, proxy=proxy, proxy_auth=proxy_auth)

    def edit_original_interaction_response(
        self,
        application_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
        payload: dict[str, Any] | None = None,
        multipart: list[dict[str, Any]] | None = None,
        files: list[File] | None = None,
    ) -> Response[MessagePayload]:
        r = Route(
            "PATCH",
            "/webhooks/{webhook_id}/{webhook_token}/messages/@original",
            webhook_id=application_id,
            webhook_token=token,
        )
        return self.request(
            r,
            session=session,
            proxy=proxy,
            proxy_auth=proxy_auth,
            payload=payload,
            multipart=multipart,
            files=files,
        )

    def delete_original_interaction_response(
        self,
        application_id: int,
        token: str,
        *,
        session: aiohttp.ClientSession,
        proxy: str | None = None,
        proxy_auth: aiohttp.BasicAuth | None = None,
    ) -> Response[None]:
        r = Route(
            "DELETE",
            "/webhooks/{webhook_id}/{webhook_token}/messages/@original",
            webhook_id=application_id,
            webhook_token=token,
        )
        return self.request(r, session=session, proxy=proxy, proxy_auth=proxy_auth)


class ExecuteWebhookParameters(NamedTuple):
    payload: dict[str, Any] | None
    multipart: list[dict[str, Any]] | None
    files: list[File] | None


def handle_message_parameters(
    content: str | None = MISSING,
    *,
    username: str = MISSING,
    avatar_url: Any = MISSING,
    tts: bool = False,
    ephemeral: bool = False,
    file: File = MISSING,
    files: list[File] = MISSING,
    attachments: list[Attachment] = MISSING,
    embed: Embed | None = MISSING,
    embeds: list[Embed] = MISSING,
    view: View | None = MISSING,
    poll: Poll | None = MISSING,
    applied_tags: list[Snowflake] = MISSING,
    allowed_mentions: AllowedMentions | None = MISSING,
    previous_allowed_mentions: AllowedMentions | None = None,
    suppress: bool = False,
) -> ExecuteWebhookParameters:
    if files is not MISSING and file is not MISSING:
        raise TypeError("Cannot mix file and files keyword arguments.")
    if embeds is not MISSING and embed is not MISSING:
        raise TypeError("Cannot mix embed and embeds keyword arguments.")

    payload = {}
    if embeds is not MISSING:
        if len(embeds) > 10:
            raise InvalidArgument("embeds has a maximum of 10 elements.")
        payload["embeds"] = [e.to_dict() for e in embeds]

    if embed is not MISSING:
        payload["embeds"] = [] if embed is None else [embed.to_dict()]
    if content is not MISSING:
        payload["content"] = str(content) if content is not None else None
    _attachments = []
    if attachments is not MISSING:
        _attachments = [a.to_dict() for a in attachments]

    if view is not MISSING:
        payload["components"] = view.to_components() if view is not None else []
    if poll is not MISSING:
        payload["poll"] = poll.to_dict()
    payload["tts"] = tts
    if avatar_url:
        payload["avatar_url"] = str(avatar_url)
    if username:
        payload["username"] = username

    flags = MessageFlags(suppress_embeds=suppress, ephemeral=ephemeral)
    payload["flags"] = flags.value

    if applied_tags is not MISSING:
        payload["applied_tags"] = applied_tags

    if allowed_mentions:
        if previous_allowed_mentions is not None:
            payload["allowed_mentions"] = previous_allowed_mentions.merge(
                allowed_mentions
            ).to_dict()
        else:
            payload["allowed_mentions"] = allowed_mentions.to_dict()
    elif previous_allowed_mentions is not None:
        payload["allowed_mentions"] = previous_allowed_mentions.to_dict()

    multipart = []
    multipart_files = []
    if file is not MISSING:
        files = [file]

    if files:
        for index, file in enumerate(files):
            multipart_files.append(
                {
                    "name": f"files[{index}]",
                    "value": file.fp,
                    "filename": file.filename,
                    "content_type": "application/octet-stream",
                }
            )
            _attachments.append(
                {
                    "id": index,
                    "filename": file.filename,
                    "description": file.description,
                }
            )

    if _attachments:
        payload["attachments"] = _attachments

    if multipart_files:
        multipart.append({"name": "payload_json", "value": utils._to_json(payload)})
        payload = None
        multipart += multipart_files

    return ExecuteWebhookParameters(payload=payload, multipart=multipart, files=files)


async_context: ContextVar[AsyncWebhookAdapter] = ContextVar(
    "async_webhook_context", default=AsyncWebhookAdapter()
)


[docs]class PartialWebhookChannel(Hashable): """Represents a partial channel for webhooks. These are typically given for channel follower webhooks. .. versionadded:: 2.0 Attributes ---------- id: :class:`int` The partial channel's ID. name: :class:`str` The partial channel's name. """ __slots__ = ("id", "name") def __init__(self, *, data): self.id = int(data["id"]) self.name = data["name"] def __repr__(self): return f"<PartialWebhookChannel name={self.name!r} id={self.id}>"
[docs]class PartialWebhookGuild(Hashable): """Represents a partial guild for webhooks. These are typically given for channel follower webhooks. .. versionadded:: 2.0 Attributes ---------- id: :class:`int` The partial guild's ID. name: :class:`str` The partial guild's name. """ __slots__ = ("id", "name", "_icon", "_state") def __init__(self, *, data, state): self._state = state self.id = int(data["id"]) self.name = data["name"] self._icon = data["icon"] def __repr__(self): return f"<PartialWebhookGuild name={self.name!r} id={self.id}>" @property def icon(self) -> Asset | None: """Returns the guild's icon asset, if available.""" if self._icon is None: return None return Asset._from_guild_icon(self._state, self.id, self._icon)
class _FriendlyHttpAttributeErrorHelper: __slots__ = () def __getattr__(self, attr): raise AttributeError("PartialWebhookState does not support http methods.") class _WebhookState: __slots__ = ("_parent", "_webhook") def __init__(self, webhook: Any, parent: ConnectionState | _WebhookState | None): self._webhook: Any = webhook self._parent: ConnectionState | None self._parent = None if isinstance(parent, _WebhookState) else parent def _get_guild(self, guild_id): if self._parent is not None: return self._parent._get_guild(guild_id) return None def store_user(self, data): if self._parent is not None: return self._parent.store_user(data) # state parameter is artificial return BaseUser(state=self, data=data) # type: ignore def create_user(self, data): # state parameter is artificial return BaseUser(state=self, data=data) # type: ignore @property def http(self): if self._parent is not None: return self._parent.http # Some data classes assign state.http and that should be kosher # however, using it should result in a late-binding error. return _FriendlyHttpAttributeErrorHelper() def __getattr__(self, attr): if self._parent is not None: return getattr(self._parent, attr) raise AttributeError(f"PartialWebhookState does not support {attr!r}.")
[docs]class WebhookMessage(Message): """Represents a message sent from your webhook. This allows you to edit or delete a message sent by your webhook. This inherits from :class:`discord.Message` with changes to :meth:`edit` and :meth:`delete` to work. .. versionadded:: 1.6 """ _state: _WebhookState
[docs] async def edit( self, content: str | None = MISSING, embeds: list[Embed] = MISSING, embed: Embed | None = MISSING, file: File = MISSING, files: list[File] = MISSING, attachments: list[Attachment] = MISSING, view: View | None = MISSING, allowed_mentions: AllowedMentions | None = None, suppress: bool | None = MISSING, ) -> WebhookMessage: """|coro| Edits the message. .. versionadded:: 1.6 .. versionchanged:: 2.0 The edit is no longer in-place, instead the newly edited message is returned. Parameters ---------- content: Optional[:class:`str`] The content to edit the message with or ``None`` to clear it. embeds: List[:class:`Embed`] A list of embeds to edit the message with. embed: Optional[:class:`Embed`] The embed to edit the message with. ``None`` suppresses the embeds. This should not be mixed with the ``embeds`` parameter. file: :class:`File` The file to upload. This cannot be mixed with ``files`` parameter. .. versionadded:: 2.0 files: List[:class:`File`] A list of files to send with the content. This cannot be mixed with the ``file`` parameter. .. versionadded:: 2.0 attachments: List[:class:`Attachment`] A list of attachments to keep in the message. If ``[]`` is passed then all attachments are removed. .. versionadded:: 2.0 allowed_mentions: :class:`AllowedMentions` Controls the mentions being processed in this message. See :meth:`.abc.Messageable.send` for more information. view: Optional[:class:`~discord.ui.View`] The updated view to update this message with. If ``None`` is passed then the view is removed. .. versionadded:: 2.0 suppress: Optional[:class:`bool`] Whether to suppress embeds for the message. Returns ------- :class:`WebhookMessage` The newly edited message. Raises ------ HTTPException Editing the message failed. Forbidden Edited a message that is not yours. TypeError You specified both ``embed`` and ``embeds`` or ``file`` and ``files`` ValueError The length of ``embeds`` was invalid InvalidArgument There was no token associated with this webhook. """ thread = MISSING if hasattr(self, "_thread_id"): thread = Object(self._thread_id) elif isinstance(self.channel, Thread): thread = Object(self.channel.id) elif isinstance(self.channel, ForumChannel): thread = Object(self.id) if attachments is MISSING: attachments = self.attachments or MISSING if suppress is MISSING: suppress = self.flags.suppress_embeds return await self._state._webhook.edit_message( self.id, content=content, embeds=embeds, embed=embed, file=file, files=files, attachments=attachments, view=view, allowed_mentions=allowed_mentions, thread=thread, suppress=suppress, )
[docs] async def delete(self, *, delay: float | None = None) -> None: """|coro| Deletes the message. Parameters ---------- delay: Optional[:class:`float`] If provided, the number of seconds to wait before deleting the message. The waiting is done in the background and deletion failures are ignored. Raises ------ Forbidden You do not have proper permissions to delete the message. NotFound The message was deleted already. HTTPException Deleting the message failed. """ thread_id: int | None = None if hasattr(self, "_thread_id"): thread_id = self._thread_id elif isinstance(self.channel, Thread): thread_id = self.channel.id if delay is not None: async def inner_call(delay: float = delay): await asyncio.sleep(delay) try: await self._state._webhook.delete_message( self.id, thread_id=thread_id ) except HTTPException: pass asyncio.create_task(inner_call()) else: await self._state._webhook.delete_message(self.id, thread_id=thread_id)
class BaseWebhook(Hashable): __slots__: tuple[str, ...] = ( "id", "type", "guild_id", "channel_id", "token", "auth_token", "user", "name", "_avatar", "source_channel", "source_guild", "_state", ) def __init__( self, data: WebhookPayload | FollowerWebhookPayload, token: str | None = None, state: ConnectionState | None = None, ): self.auth_token: str | None = token self._state: ConnectionState | _WebhookState = state or _WebhookState( self, parent=state ) self._update(data) def _update(self, data: WebhookPayload | FollowerWebhookPayload): self.id = int(data["id"]) self.type = try_enum(WebhookType, int(data["type"])) self.channel_id = utils._get_as_snowflake(data, "channel_id") self.guild_id = utils._get_as_snowflake(data, "guild_id") self.name = data.get("name") self._avatar = data.get("avatar") self.token = data.get("token") user = data.get("user") self.user: BaseUser | User | None = None if user is not None: # state parameter may be _WebhookState self.user = User(state=self._state, data=user) # type: ignore source_channel = data.get("source_channel") if source_channel: source_channel = PartialWebhookChannel(data=source_channel) self.source_channel: PartialWebhookChannel | None = source_channel source_guild = data.get("source_guild") if source_guild: source_guild = PartialWebhookGuild(data=source_guild, state=self._state) self.source_guild: PartialWebhookGuild | None = source_guild def is_partial(self) -> bool: """Whether the webhook is a "partial" webhook. .. versionadded:: 2.0 """ return self.channel_id is None def is_authenticated(self) -> bool: """Whether the webhook is authenticated with a bot token. .. versionadded:: 2.0 """ return self.auth_token is not None @property def guild(self) -> Guild | None: """The guild this webhook belongs to. If this is a partial webhook, then this will always return ``None``. """ return self._state and self._state._get_guild(self.guild_id) @property def channel(self) -> TextChannel | None: """The text channel this webhook belongs to. If this is a partial webhook, then this will always return ``None``. """ guild = self.guild return guild and guild.get_channel(self.channel_id) # type: ignore @property def created_at(self) -> datetime.datetime: """Returns the webhook's creation time in UTC.""" return utils.snowflake_time(self.id) @property def avatar(self) -> Asset: """Returns an :class:`Asset` for the avatar the webhook has. If the webhook does not have a traditional avatar, an asset for the default avatar is returned instead. """ if self._avatar is None: # Default is always blurple apparently return Asset._from_default_avatar(self._state, 0) return Asset._from_avatar(self._state, self.id, self._avatar)
[docs]class Webhook(BaseWebhook): """Represents an asynchronous Discord webhook. Webhooks are a form to send messages to channels in Discord without a bot user or authentication. There are two main ways to use Webhooks. The first is through the ones received by the library such as :meth:`.Guild.webhooks` and :meth:`.TextChannel.webhooks`. The ones received by the library will automatically be bound using the library's internal HTTP session. The second form involves creating a webhook object manually using the :meth:`~.Webhook.from_url` or :meth:`~.Webhook.partial` classmethods. For example, creating a webhook from a URL and using :doc:`aiohttp <aio:index>`: .. code-block:: python3 from discord import Webhook import aiohttp async def foo(): async with aiohttp.ClientSession() as session: webhook = Webhook.from_url('url-here', session=session) await webhook.send('Hello World', username='Foo') For a synchronous counterpart, see :class:`SyncWebhook`. .. container:: operations .. describe:: x == y Checks if two webhooks are equal. .. describe:: x != y Checks if two webhooks are not equal. .. describe:: hash(x) Returns the webhook's hash. .. versionchanged:: 1.4 Webhooks are now comparable and hashable. Attributes ---------- id: :class:`int` The webhook's ID type: :class:`WebhookType` The type of the webhook. .. versionadded:: 1.3 token: Optional[:class:`str`] The authentication token of the webhook. If this is ``None`` then the webhook cannot be used to make requests. guild_id: Optional[:class:`int`] The guild ID this webhook is for. channel_id: Optional[:class:`int`] The channel ID this webhook is for. user: Optional[:class:`abc.User`] The user this webhook was created by. If the webhook was received without authentication then this will be ``None``. name: Optional[:class:`str`] The default name of the webhook. source_guild: Optional[:class:`PartialWebhookGuild`] The guild of the channel that this webhook is following. Only given if :attr:`type` is :attr:`WebhookType.channel_follower`. .. versionadded:: 2.0 source_channel: Optional[:class:`PartialWebhookChannel`] The channel that this webhook is following. Only given if :attr:`type` is :attr:`WebhookType.channel_follower`. .. versionadded:: 2.0 """ __slots__: tuple[str, ...] = ("session", "proxy", "proxy_auth") def __init__( self, data: WebhookPayload | FollowerWebhookPayload, session: aiohttp.ClientSession, proxy: str | None = None, proxy_auth: aiohttp.BasicAuth | None = None, token: str | None = None, state=None, ): super().__init__(data, token, state) self.session = session self.proxy: str | None = proxy self.proxy_auth: aiohttp.BasicAuth | None = proxy_auth def __repr__(self): return f"<Webhook id={self.id!r}>" @property def url(self) -> str: """Returns the webhook's url.""" return f"https://discord.com/api/webhooks/{self.id}/{self.token}"
[docs] @classmethod def partial( cls, id: int, token: str, *, session: aiohttp.ClientSession, proxy: str | None = None, proxy_auth: aiohttp.BasicAuth | None = None, bot_token: str | None = None, ) -> Webhook: """Creates a partial :class:`Webhook`. Parameters ---------- id: :class:`int` The ID of the webhook. token: :class:`str` The authentication token of the webhook. session: :class:`aiohttp.ClientSession` The session to use to send requests with. Note that the library does not manage the session and will not close it. .. versionadded:: 2.0 bot_token: Optional[:class:`str`] The bot authentication token for authenticated requests involving the webhook. .. versionadded:: 2.0 Returns ------- :class:`Webhook` A partial :class:`Webhook`. A partial webhook is just a webhook object with an ID and a token. """ data: WebhookPayload = { "id": id, "type": 1, "token": token, } return cls(data, session, proxy=proxy, proxy_auth=proxy_auth, token=bot_token)
[docs] @classmethod def from_url( cls, url: str, *, session: aiohttp.ClientSession, proxy: str | None = None, proxy_auth: aiohttp.BasicAuth | None = None, bot_token: str | None = None, ) -> Webhook: """Creates a partial :class:`Webhook` from a webhook URL. Parameters ---------- url: :class:`str` The URL of the webhook. session: :class:`aiohttp.ClientSession` The session to use to send requests with. Note that the library does not manage the session and will not close it. .. versionadded:: 2.0 bot_token: Optional[:class:`str`] The bot authentication token for authenticated requests involving the webhook. .. versionadded:: 2.0 Returns ------- :class:`Webhook` A partial :class:`Webhook`. A partial webhook is just a webhook object with an ID and a token. Raises ------ InvalidArgument The URL is invalid. """ m = re.search( r"discord(?:app)?.com/api/webhooks/(?P<id>\d{17,20})/(?P<token>[\w\.\-_]{60,68})", url, ) if m is None: raise InvalidArgument("Invalid webhook URL given.") data: dict[str, Any] = m.groupdict() data["type"] = 1 return cls(data, session, proxy=proxy, proxy_auth=proxy_auth, token=bot_token) # type: ignore
@classmethod def _as_follower(cls, data, *, channel, user) -> Webhook: name = f"{channel.guild} #{channel}" feed: WebhookPayload = { "id": data["webhook_id"], "type": 2, "name": name, "channel_id": channel.id, "guild_id": channel.guild.id, "user": { "username": user.name, "discriminator": user.discriminator, "global_name": user.global_name, "id": user.id, "avatar": user._avatar, }, } state = channel._state http = state.http session = http._HTTPClient__session proxy_auth = http.proxy_auth proxy = http.proxy return cls( feed, session=session, state=state, proxy_auth=proxy_auth, proxy=proxy, token=state.http.token, ) @classmethod def from_state(cls, data, state) -> Webhook: http = state.http session = http._HTTPClient__session proxy_auth = http.proxy_auth proxy = http.proxy return cls( data, session=session, state=state, proxy_auth=proxy_auth, proxy=proxy, token=state.http.token, )
[docs] async def fetch(self, *, prefer_auth: bool = True) -> Webhook: """|coro| Fetches the current webhook. This could be used to get a full webhook from a partial webhook. .. versionadded:: 2.0 .. note:: When fetching with an unauthenticated webhook, i.e. :meth:`is_authenticated` returns ``False``, then the returned webhook does not contain any user information. Parameters ---------- prefer_auth: :class:`bool` Whether to use the bot token over the webhook token if available. Defaults to ``True``. Returns ------- :class:`Webhook` The fetched webhook. Raises ------ HTTPException Could not fetch the webhook NotFound Could not find the webhook by this ID InvalidArgument This webhook does not have a token associated with it. """ adapter = async_context.get() if prefer_auth and self.auth_token: data = await adapter.fetch_webhook( self.id, self.auth_token, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, ) elif self.token: data = await adapter.fetch_webhook_with_token( self.id, self.token, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, ) else: raise InvalidArgument( "This webhook does not have a token associated with it" ) return Webhook( data, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, token=self.auth_token, state=self._state, )
[docs] async def delete(self, *, reason: str | None = None, prefer_auth: bool = True): """|coro| Deletes this Webhook. Parameters ---------- reason: Optional[:class:`str`] The reason for deleting this webhook. Shows up on the audit log. .. versionadded:: 1.4 prefer_auth: :class:`bool` Whether to use the bot token over the webhook token if available. Defaults to ``True``. .. versionadded:: 2.0 Raises ------ HTTPException Deleting the webhook failed. NotFound This webhook does not exist. Forbidden You do not have permissions to delete this webhook. InvalidArgument This webhook does not have a token associated with it. """ if self.token is None and self.auth_token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) adapter = async_context.get() if prefer_auth and self.auth_token: await adapter.delete_webhook( self.id, token=self.auth_token, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, reason=reason, ) elif self.token: await adapter.delete_webhook_with_token( self.id, self.token, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, reason=reason, )
[docs] async def edit( self, *, reason: str | None = None, name: str | None = MISSING, avatar: bytes | None = MISSING, channel: Snowflake | None = None, prefer_auth: bool = True, ) -> Webhook: """|coro| Edits this Webhook. Parameters ---------- name: Optional[:class:`str`] The webhook's new default name. avatar: Optional[:class:`bytes`] A :term:`py:bytes-like object` representing the webhook's new default avatar. channel: Optional[:class:`abc.Snowflake`] The webhook's new channel. This requires an authenticated webhook. .. versionadded:: 2.0 reason: Optional[:class:`str`] The reason for editing this webhook. Shows up on the audit log. .. versionadded:: 1.4 prefer_auth: :class:`bool` Whether to use the bot token over the webhook token if available. Defaults to ``True``. .. versionadded:: 2.0 Raises ------ HTTPException Editing the webhook failed. NotFound This webhook does not exist. InvalidArgument This webhook does not have a token associated with it, or it tried editing a channel without authentication. """ if self.token is None and self.auth_token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) payload = {} if name is not MISSING: payload["name"] = str(name) if name is not None else None if avatar is not MISSING: payload["avatar"] = ( utils._bytes_to_base64_data(avatar) if avatar is not None else None ) adapter = async_context.get() data: WebhookPayload | None = None # If a channel is given, always use the authenticated endpoint if channel is not None: if self.auth_token is None: raise InvalidArgument("Editing channel requires authenticated webhook") payload["channel_id"] = channel.id data = await adapter.edit_webhook( self.id, self.auth_token, payload=payload, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, reason=reason, ) if prefer_auth and self.auth_token: data = await adapter.edit_webhook( self.id, self.auth_token, payload=payload, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, reason=reason, ) elif self.token: data = await adapter.edit_webhook_with_token( self.id, self.token, payload=payload, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, reason=reason, ) if data is None: raise RuntimeError("Unreachable code hit: data was not assigned") return Webhook( data, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, token=self.auth_token, state=self._state, )
def _create_message(self, data): state = _WebhookState(self, parent=self._state) # state may be artificial (unlikely at this point...) channel = self.channel or PartialMessageable(state=self._state, id=int(data["channel_id"])) # type: ignore # state is artificial return WebhookMessage(data=data, state=state, channel=channel) # type: ignore @overload async def send( self, content: str = MISSING, *, username: str = MISSING, avatar_url: Any = MISSING, tts: bool = MISSING, ephemeral: bool = MISSING, file: File = MISSING, files: list[File] = MISSING, embed: Embed = MISSING, embeds: list[Embed] = MISSING, allowed_mentions: AllowedMentions = MISSING, view: View = MISSING, poll: Poll = MISSING, thread: Snowflake = MISSING, thread_name: str | None = None, applied_tags: list[Snowflake] = MISSING, wait: Literal[True], delete_after: float = None, ) -> WebhookMessage: ... @overload async def send( self, content: str = MISSING, *, username: str = MISSING, avatar_url: Any = MISSING, tts: bool = MISSING, ephemeral: bool = MISSING, file: File = MISSING, files: list[File] = MISSING, embed: Embed = MISSING, embeds: list[Embed] = MISSING, allowed_mentions: AllowedMentions = MISSING, view: View = MISSING, poll: Poll = MISSING, thread: Snowflake = MISSING, thread_name: str | None = None, applied_tags: list[Snowflake] = MISSING, wait: Literal[False] = ..., delete_after: float = None, ) -> None: ...
[docs] async def send( self, content: str = MISSING, *, username: str = MISSING, avatar_url: Any = MISSING, tts: bool = False, ephemeral: bool = False, file: File = MISSING, files: list[File] = MISSING, embed: Embed = MISSING, embeds: list[Embed] = MISSING, allowed_mentions: AllowedMentions = MISSING, view: View = MISSING, poll: Poll = MISSING, thread: Snowflake = MISSING, thread_name: str | None = None, applied_tags: list[Snowflake] = MISSING, wait: bool = False, delete_after: float = None, ) -> WebhookMessage | None: """|coro| Sends a message using the webhook. The content must be a type that can convert to a string through ``str(content)``. To upload a single file, the ``file`` parameter should be used with a single :class:`File` object. If the ``embed`` parameter is provided, it must be of type :class:`Embed` and it must be a rich embed type. You cannot mix the ``embed`` parameter with the ``embeds`` parameter, which must be a :class:`list` of :class:`Embed` objects to send. Parameters ---------- content: :class:`str` The content of the message to send. wait: :class:`bool` Whether the server should wait before sending a response. This essentially means that the return type of this function changes from ``None`` to a :class:`WebhookMessage` if set to ``True``. If the type of webhook is :attr:`WebhookType.application` then this is always set to ``True``. username: :class:`str` The username to send with this message. If no username is provided then the default username for the webhook is used. avatar_url: :class:`str` The avatar URL to send with this message. If no avatar URL is provided then the default avatar for the webhook is used. If this is not a string then it is explicitly cast using ``str``. tts: :class:`bool` Indicates if the message should be sent using text-to-speech. ephemeral: :class:`bool` Indicates if the message should only be visible to the user. This is only available to :attr:`WebhookType.application` webhooks. If a view is sent with an ephemeral message, and it has no timeout set then the timeout is set to 15 minutes. .. versionadded:: 2.0 file: :class:`File` The file to upload. This cannot be mixed with ``files`` parameter. files: List[:class:`File`] A list of files to send with the content. This cannot be mixed with the ``file`` parameter. embed: :class:`Embed` The rich embed for the content to send. This cannot be mixed with ``embeds`` parameter. embeds: List[:class:`Embed`] A list of embeds to send with the content. Maximum of 10. This cannot be mixed with the ``embed`` parameter. allowed_mentions: :class:`AllowedMentions` Controls the mentions being processed in this message. .. versionadded:: 1.4 view: :class:`discord.ui.View` The view to send with the message. You can only send a view if this webhook is not partial and has state attached. A webhook has state attached if the webhook is managed by the library. .. versionadded:: 2.0 thread: :class:`~discord.abc.Snowflake` The thread to send this webhook to. .. versionadded:: 2.0 thread_name: :class:`str` The name of the thread to create. Only works for forum channels. .. versionadded:: 2.0 applied_tags: List[:class:`Snowflake`] A list of tags to apply to the message. Only works for threads. .. versionadded:: 2.5 delete_after: :class:`float` If provided, the number of seconds to wait in the background before deleting the message we just sent. poll: :class:`Poll` The poll to send. .. versionadded:: 2.6 Returns ------- Optional[:class:`WebhookMessage`] If ``wait`` is ``True`` then the message that was sent, otherwise ``None``. Raises ------ HTTPException Sending the message failed. NotFound This webhook was not found. Forbidden The authorization token for the webhook is incorrect. TypeError You specified both ``embed`` and ``embeds`` or ``file`` and ``files``. ValueError The length of ``embeds`` was invalid. InvalidArgument Either there was no token associated with this webhook, ``ephemeral`` was passed with the improper webhook type, there was no state attached with this webhook when giving it a view, you specified both ``thread_name`` and ``thread``, or ``applied_tags`` was passed with neither ``thread_name`` nor ``thread`` specified. """ if self.token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) previous_mentions: AllowedMentions | None = getattr( self._state, "allowed_mentions", None ) if content is None: content = MISSING if thread and thread_name: raise InvalidArgument("You cannot specify both a thread and thread_name") if applied_tags and not (thread or thread_name): raise InvalidArgument("You cannot specify applied_tags without a thread") application_webhook = self.type is WebhookType.application if ephemeral and not application_webhook: raise InvalidArgument( "ephemeral messages can only be sent from application webhooks" ) if application_webhook: wait = True if view is not MISSING: if isinstance(self._state, _WebhookState): raise InvalidArgument( "Webhook views require an associated state with the webhook" ) if ephemeral is True and view.timeout is None: view.timeout = 15 * 60.0 if poll is None: poll = MISSING params = handle_message_parameters( content=content, username=username, avatar_url=avatar_url, tts=tts, file=file, files=files, embed=embed, embeds=embeds, ephemeral=ephemeral, view=view, poll=poll, applied_tags=applied_tags, allowed_mentions=allowed_mentions, previous_allowed_mentions=previous_mentions, ) adapter = async_context.get() thread_id: int | None = None if thread is not MISSING: thread_id = thread.id data = await adapter.execute_webhook( self.id, self.token, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, payload=params.payload, multipart=params.multipart, files=params.files, thread_id=thread_id, thread_name=thread_name, wait=wait, ) msg = None if wait: msg = self._create_message(data) if view is not MISSING and not view.is_finished(): message_id = None if msg is None else msg.id view.message = None if msg is None else msg self._state.store_view(view, message_id) if delete_after is not None: async def delete(): await asyncio.sleep(delete_after) await msg.delete() asyncio.ensure_future(delete(), loop=self._state.loop) return msg
[docs] async def fetch_message( self, id: int, *, thread_id: int | None = None ) -> WebhookMessage: """|coro| Retrieves a single :class:`~discord.WebhookMessage` owned by this webhook. .. versionadded:: 2.0 Parameters ---------- id: :class:`int` The message ID to look for. thread_id: Optional[:class:`int`] The ID of the thread that contains the message. Returns ------- :class:`~discord.WebhookMessage` The message asked for. Raises ------ ~discord.NotFound The specified message was not found. ~discord.Forbidden You do not have the permissions required to get a message. ~discord.HTTPException Retrieving the message failed. InvalidArgument There was no token associated with this webhook. """ if self.token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) adapter = async_context.get() data = await adapter.get_webhook_message( self.id, self.token, id, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, thread_id=thread_id, ) msg = self._create_message(data) if thread_id and isinstance(msg.channel, PartialMessageable): msg._thread_id = thread_id return msg
[docs] async def edit_message( self, message_id: int, *, content: str | None = MISSING, embeds: list[Embed] = MISSING, embed: Embed | None = MISSING, file: File = MISSING, files: list[File] = MISSING, attachments: list[Attachment] = MISSING, view: View | None = MISSING, allowed_mentions: AllowedMentions | None = None, thread: Snowflake | None = MISSING, suppress: bool = False, ) -> WebhookMessage: """|coro| Edits a message owned by this webhook. This is a lower level interface to :meth:`WebhookMessage.edit` in case you only have an ID. .. versionadded:: 1.6 .. versionchanged:: 2.0 The edit is no longer in-place, instead the newly edited message is returned. Parameters ---------- message_id: :class:`int` The message ID to edit. content: Optional[:class:`str`] The content to edit the message with or ``None`` to clear it. embeds: List[:class:`Embed`] A list of embeds to edit the message with. embed: Optional[:class:`Embed`] The embed to edit the message with. ``None`` suppresses the embeds. This should not be mixed with the ``embeds`` parameter. attachments: List[:class:`Attachment`] A list of attachments to keep in the message. If ``[]`` is passed then all attachments are removed. file: :class:`File` The file to upload. This cannot be mixed with ``files`` parameter. .. versionadded:: 2.0 files: List[:class:`File`] A list of files to send with the content. This cannot be mixed with the ``file`` parameter. .. versionadded:: 2.0 allowed_mentions: :class:`AllowedMentions` Controls the mentions being processed in this message. See :meth:`.abc.Messageable.send` for more information. view: Optional[:class:`~discord.ui.View`] The updated view to update this message with. If ``None`` is passed then the view is removed. The webhook must have state attached, similar to :meth:`send`. .. versionadded:: 2.0 thread: Optional[:class:`~discord.abc.Snowflake`] The thread that contains the message. suppress: :class:`bool` Whether to suppress embeds for the message. Returns ------- :class:`WebhookMessage` The newly edited webhook message. Raises ------ HTTPException Editing the message failed. Forbidden Edited a message that is not yours. TypeError You specified both ``embed`` and ``embeds`` or ``file`` and ``files`` ValueError The length of ``embeds`` was invalid InvalidArgument There was no token associated with this webhook or the webhook had no state. """ if self.token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) if view is not MISSING: if isinstance(self._state, _WebhookState): raise InvalidArgument( "This webhook does not have state associated with it" ) self._state.prevent_view_updates_for(message_id) previous_mentions: AllowedMentions | None = getattr( self._state, "allowed_mentions", None ) params = handle_message_parameters( content=content, file=file, files=files, attachments=attachments, embed=embed, embeds=embeds, view=view, allowed_mentions=allowed_mentions, previous_allowed_mentions=previous_mentions, suppress=suppress, ) thread_id: int | None = None if thread is not MISSING: thread_id = thread.id adapter = async_context.get() data = await adapter.edit_webhook_message( self.id, self.token, message_id, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, thread_id=thread_id, payload=params.payload, multipart=params.multipart, files=params.files, ) message = self._create_message(data) if view and not view.is_finished(): view.message = message self._state.store_view(view, message_id) return message
[docs] async def delete_message( self, message_id: int, *, thread_id: int | None = None ) -> None: """|coro| Deletes a message owned by this webhook. This is a lower level interface to :meth:`WebhookMessage.delete` in case you only have an ID. .. versionadded:: 1.6 Parameters ---------- message_id: :class:`int` The message ID to delete. thread_id: Optional[:class:`int`] The ID of the thread that contains the message. Raises ------ HTTPException Deleting the message failed. Forbidden Deleted a message that is not yours. """ if self.token is None: raise InvalidArgument( "This webhook does not have a token associated with it" ) adapter = async_context.get() await adapter.delete_webhook_message( self.id, self.token, message_id, session=self.session, proxy=self.proxy, proxy_auth=self.proxy_auth, thread_id=thread_id, )