"""
The MIT License (MIT)
Copyright (c) 2015-2021 Rapptz
Copyright (c) 2021-present Pycord Development
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import asyncio
import datetime
import functools
import inspect
import re
import sys
import types
from collections import OrderedDict
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
Generator,
Generic,
TypeVar,
Union,
)
from ..channel import PartialMessageable, _threaded_guild_channel_factory
from ..enums import Enum as DiscordEnum
from ..enums import (
IntegrationType,
InteractionContextType,
MessageType,
SlashCommandOptionType,
try_enum,
)
from ..errors import (
ApplicationCommandError,
ApplicationCommandInvokeError,
CheckFailure,
ClientException,
InvalidArgument,
ValidationError,
)
from ..member import Member
from ..message import Attachment, Message
from ..object import Object
from ..role import Role
from ..threads import Thread
from ..user import User
from ..utils import MISSING, async_all, find, maybe_coroutine, utcnow, warn_deprecated
from .context import ApplicationContext, AutocompleteContext
from .options import Option, OptionChoice
if sys.version_info >= (3, 11):
from typing import Annotated, get_args, get_origin
else:
from typing_extensions import Annotated, get_args, get_origin
__all__ = (
"_BaseCommand",
"ApplicationCommand",
"SlashCommand",
"slash_command",
"application_command",
"user_command",
"message_command",
"command",
"SlashCommandGroup",
"ContextMenuCommand",
"UserCommand",
"MessageCommand",
)
if TYPE_CHECKING:
from typing_extensions import Concatenate, ParamSpec
from .. import Permissions
from ..cog import Cog
from ..ext.commands.cooldowns import CooldownMapping, MaxConcurrency
T = TypeVar("T")
CogT = TypeVar("CogT", bound="Cog")
Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]])
if TYPE_CHECKING:
P = ParamSpec("P")
else:
P = TypeVar("P")
def wrap_callback(coro):
from ..ext.commands.errors import CommandError
@functools.wraps(coro)
async def wrapped(*args, **kwargs):
try:
ret = await coro(*args, **kwargs)
except ApplicationCommandError:
raise
except CommandError:
raise
except asyncio.CancelledError:
return
except Exception as exc:
raise ApplicationCommandInvokeError(exc) from exc
return ret
return wrapped
def hooked_wrapped_callback(command, ctx, coro):
from ..ext.commands.errors import CommandError
@functools.wraps(coro)
async def wrapped(arg):
try:
ret = await coro(arg)
except ApplicationCommandError:
raise
except CommandError:
raise
except asyncio.CancelledError:
return
except Exception as exc:
raise ApplicationCommandInvokeError(exc) from exc
finally:
if (
hasattr(command, "_max_concurrency")
and command._max_concurrency is not None
):
await command._max_concurrency.release(ctx)
await command.call_after_hooks(ctx)
return ret
return wrapped
def unwrap_function(function: Callable[..., Any]) -> Callable[..., Any]:
partial = functools.partial
while True:
if hasattr(function, "__wrapped__"):
function = function.__wrapped__
elif isinstance(function, partial):
function = function.func
else:
return function
def _validate_names(obj):
validate_chat_input_name(obj.name)
if obj.name_localizations:
for locale, string in obj.name_localizations.items():
validate_chat_input_name(string, locale=locale)
def _validate_descriptions(obj):
validate_chat_input_description(obj.description)
if obj.description_localizations:
for locale, string in obj.description_localizations.items():
validate_chat_input_description(string, locale=locale)
class _BaseCommand:
__slots__ = ()
[docs]
class ApplicationCommand(_BaseCommand, Generic[CogT, P, T]):
__original_kwargs__: dict[str, Any]
cog = None
def __init__(self, func: Callable, **kwargs) -> None:
from ..ext.commands.cooldowns import BucketType, CooldownMapping, MaxConcurrency
cooldown = getattr(func, "__commands_cooldown__", kwargs.get("cooldown"))
if cooldown is None:
buckets = CooldownMapping(cooldown, BucketType.default)
elif isinstance(cooldown, CooldownMapping):
buckets = cooldown
else:
raise TypeError(
"Cooldown must be a an instance of CooldownMapping or None."
)
self._buckets: CooldownMapping = buckets
max_concurrency = getattr(
func, "__commands_max_concurrency__", kwargs.get("max_concurrency")
)
self._max_concurrency: MaxConcurrency | None = max_concurrency
self._callback = None
self.module = None
self.name: str = kwargs.get("name", func.__name__)
try:
checks = func.__commands_checks__
checks.reverse()
except AttributeError:
checks = kwargs.get("checks", [])
self.checks = checks
self.id: int | None = kwargs.get("id")
self.guild_ids: list[int] | None = kwargs.get("guild_ids", None)
self.parent = kwargs.get("parent")
# Permissions
self.default_member_permissions: Permissions | None = getattr(
func,
"__default_member_permissions__",
kwargs.get("default_member_permissions", None),
)
self.nsfw: bool | None = getattr(func, "__nsfw__", kwargs.get("nsfw", None))
integration_types = getattr(
func, "__integration_types__", kwargs.get("integration_types", None)
)
contexts = getattr(func, "__contexts__", kwargs.get("contexts", None))
guild_only = getattr(func, "__guild_only__", kwargs.get("guild_only", MISSING))
if guild_only is not MISSING:
warn_deprecated(
"guild_only",
"contexts",
"2.6",
reference="https://discord.com/developers/docs/change-log#userinstallable-apps-preview",
)
if contexts and guild_only:
raise InvalidArgument(
"cannot pass both 'contexts' and 'guild_only' to ApplicationCommand"
)
if self.guild_ids and (
(contexts is not None) or guild_only or integration_types
):
raise InvalidArgument(
"the 'contexts' and 'integration_types' parameters are not available for guild commands"
)
if guild_only:
contexts = {InteractionContextType.guild}
self.contexts: set[InteractionContextType] | None = contexts
self.integration_types: set[IntegrationType] | None = integration_types
def __repr__(self) -> str:
return f"<discord.commands.{self.__class__.__name__} name={self.name}>"
def __eq__(self, other) -> bool:
return (
isinstance(other, self.__class__)
and self.qualified_name == other.qualified_name
and self.guild_ids == other.guild_ids
)
async def __call__(self, ctx, *args, **kwargs):
"""|coro|
Calls the command's callback.
This method bypasses all checks that a command has and does not
convert the arguments beforehand, so take care to pass the correct
arguments in.
"""
if self.cog is not None:
return await self.callback(self.cog, ctx, *args, **kwargs)
return await self.callback(ctx, *args, **kwargs)
@property
def callback(
self,
) -> (
Callable[Concatenate[CogT, ApplicationContext, P], Coro[T]]
| Callable[Concatenate[ApplicationContext, P], Coro[T]]
):
return self._callback
@callback.setter
def callback(
self,
function: (
Callable[Concatenate[CogT, ApplicationContext, P], Coro[T]]
| Callable[Concatenate[ApplicationContext, P], Coro[T]]
),
) -> None:
self._callback = function
unwrap = unwrap_function(function)
self.module = unwrap.__module__
@property
def guild_only(self) -> bool:
warn_deprecated(
"guild_only",
"contexts",
"2.6",
reference="https://discord.com/developers/docs/change-log#userinstallable-apps-preview",
)
return InteractionContextType.guild in self.contexts and len(self.contexts) == 1
@guild_only.setter
def guild_only(self, value: bool) -> None:
warn_deprecated(
"guild_only",
"contexts",
"2.6",
reference="https://discord.com/developers/docs/change-log#userinstallable-apps-preview",
)
if value:
self.contexts = {InteractionContextType.guild}
else:
self.contexts = {
InteractionContextType.guild,
InteractionContextType.bot_dm,
InteractionContextType.private_channel,
}
def _prepare_cooldowns(self, ctx: ApplicationContext):
if self._buckets.valid:
current = datetime.datetime.now().timestamp()
bucket = self._buckets.get_bucket(ctx, current) # type: ignore # ctx instead of non-existent message
if bucket is not None:
retry_after = bucket.update_rate_limit(current)
if retry_after:
from ..ext.commands.errors import CommandOnCooldown
raise CommandOnCooldown(bucket, retry_after, self._buckets.type) # type: ignore
async def prepare(self, ctx: ApplicationContext) -> None:
# This should be same across all 3 types
ctx.command = self
if not await self.can_run(ctx):
raise CheckFailure(
f"The check functions for the command {self.name} failed"
)
if self._max_concurrency is not None:
# For this application, context can be duck-typed as a Message
await self._max_concurrency.acquire(ctx) # type: ignore # ctx instead of non-existent message
try:
self._prepare_cooldowns(ctx)
await self.call_before_hooks(ctx)
except:
if self._max_concurrency is not None:
await self._max_concurrency.release(ctx) # type: ignore # ctx instead of non-existent message
raise
[docs]
def is_on_cooldown(self, ctx: ApplicationContext) -> bool:
"""Checks whether the command is currently on cooldown.
.. note::
This uses the current time instead of the interaction time.
Parameters
----------
ctx: :class:`.ApplicationContext`
The invocation context to use when checking the command's cooldown status.
Returns
-------
:class:`bool`
A boolean indicating if the command is on cooldown.
"""
if not self._buckets.valid:
return False
bucket = self._buckets.get_bucket(ctx) # type: ignore
current = utcnow().timestamp()
return bucket.get_tokens(current) == 0
[docs]
def reset_cooldown(self, ctx: ApplicationContext) -> None:
"""Resets the cooldown on this command.
Parameters
----------
ctx: :class:`.ApplicationContext`
The invocation context to reset the cooldown under.
"""
if self._buckets.valid:
bucket = self._buckets.get_bucket(ctx) # type: ignore # ctx instead of non-existent message
bucket.reset()
[docs]
def get_cooldown_retry_after(self, ctx: ApplicationContext) -> float:
"""Retrieves the amount of seconds before this command can be tried again.
.. note::
This uses the current time instead of the interaction time.
Parameters
----------
ctx: :class:`.ApplicationContext`
The invocation context to retrieve the cooldown from.
Returns
-------
:class:`float`
The amount of time left on this command's cooldown in seconds.
If this is ``0.0`` then the command isn't on cooldown.
"""
if self._buckets.valid:
bucket = self._buckets.get_bucket(ctx) # type: ignore
current = utcnow().timestamp()
return bucket.get_retry_after(current)
return 0.0
async def invoke(self, ctx: ApplicationContext) -> None:
await self.prepare(ctx)
injected = hooked_wrapped_callback(self, ctx, self._invoke)
await injected(ctx)
async def can_run(self, ctx: ApplicationContext) -> bool:
if not await ctx.bot.can_run(ctx):
raise CheckFailure(
f"The global check functions for command {self.name} failed."
)
predicates = self.checks
if self.parent is not None:
# parent checks should be run first
predicates = self.parent.checks + predicates
cog = self.cog
if cog is not None:
local_check = cog._get_overridden_method(cog.cog_check)
if local_check is not None:
ret = await maybe_coroutine(local_check, ctx)
if not ret:
return False
if not predicates:
# since we have no checks, then we just return True.
return True
return await async_all(predicate(ctx) for predicate in predicates) # type: ignore
async def dispatch_error(self, ctx: ApplicationContext, error: Exception) -> None:
ctx.command_failed = True
cog = self.cog
try:
coro = self.on_error
except AttributeError:
pass
else:
injected = wrap_callback(coro)
if cog is not None:
await injected(cog, ctx, error)
else:
await injected(ctx, error)
try:
if cog is not None:
local = cog.__class__._get_overridden_method(cog.cog_command_error)
if local is not None:
wrapped = wrap_callback(local)
await wrapped(ctx, error)
finally:
ctx.bot.dispatch("application_command_error", ctx, error)
def _get_signature_parameters(self):
return OrderedDict(inspect.signature(self.callback).parameters)
[docs]
def error(self, coro):
"""A decorator that registers a coroutine as a local error handler.
A local error handler is an :func:`.on_command_error` event limited to
a single command. However, the :func:`.on_command_error` is still
invoked afterwards as the catch-all.
Parameters
----------
coro: :ref:`coroutine <coroutine>`
The coroutine to register as the local error handler.
Raises
------
TypeError
The coroutine passed is not actually a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError("The error handler must be a coroutine.")
self.on_error = coro
return coro
[docs]
def has_error_handler(self) -> bool:
"""Checks whether the command has an error handler registered."""
return hasattr(self, "on_error")
[docs]
def before_invoke(self, coro):
"""A decorator that registers a coroutine as a pre-invoke hook.
A pre-invoke hook is called directly before the command is
called. This makes it a useful function to set up database
connections or any type of set up required.
This pre-invoke hook takes a sole parameter, an :class:`.ApplicationContext`.
See :meth:`.Bot.before_invoke` for more info.
Parameters
----------
coro: :ref:`coroutine <coroutine>`
The coroutine to register as the pre-invoke hook.
Raises
------
TypeError
The coroutine passed is not actually a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError("The pre-invoke hook must be a coroutine.")
self._before_invoke = coro
return coro
[docs]
def after_invoke(self, coro):
"""A decorator that registers a coroutine as a post-invoke hook.
A post-invoke hook is called directly after the command is
called. This makes it a useful function to clean-up database
connections or any type of clean up required.
This post-invoke hook takes a sole parameter, an :class:`.ApplicationContext`.
See :meth:`.Bot.after_invoke` for more info.
Parameters
----------
coro: :ref:`coroutine <coroutine>`
The coroutine to register as the post-invoke hook.
Raises
------
TypeError
The coroutine passed is not actually a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError("The post-invoke hook must be a coroutine.")
self._after_invoke = coro
return coro
async def call_before_hooks(self, ctx: ApplicationContext) -> None:
# now that we're done preparing we can call the pre-command hooks
# first, call the command local hook:
cog = self.cog
if self._before_invoke is not None:
# should be cog if @commands.before_invoke is used
instance = getattr(self._before_invoke, "__self__", cog)
# __self__ only exists for methods, not functions
# however, if @command.before_invoke is used, it will be a function
if instance:
await self._before_invoke(instance, ctx) # type: ignore
else:
await self._before_invoke(ctx) # type: ignore
# call the cog local hook if applicable:
if cog is not None:
hook = cog.__class__._get_overridden_method(cog.cog_before_invoke)
if hook is not None:
await hook(ctx)
# call the bot global hook if necessary
hook = ctx.bot._before_invoke
if hook is not None:
await hook(ctx)
async def call_after_hooks(self, ctx: ApplicationContext) -> None:
cog = self.cog
if self._after_invoke is not None:
instance = getattr(self._after_invoke, "__self__", cog)
if instance:
await self._after_invoke(instance, ctx) # type: ignore
else:
await self._after_invoke(ctx) # type: ignore
# call the cog local hook if applicable:
if cog is not None:
hook = cog.__class__._get_overridden_method(cog.cog_after_invoke)
if hook is not None:
await hook(ctx)
hook = ctx.bot._after_invoke
if hook is not None:
await hook(ctx)
@property
def cooldown(self):
return self._buckets._cooldown
@property
def full_parent_name(self) -> str:
"""Retrieves the fully qualified parent command name.
This the base command name required to execute it. For example,
in ``/one two three`` the parent name would be ``one two``.
"""
entries = []
command = self
while command.parent is not None and hasattr(command.parent, "name"):
command = command.parent
entries.append(command.name)
return " ".join(reversed(entries))
@property
def qualified_name(self) -> str:
"""Retrieves the fully qualified command name.
This is the full parent name with the command name as well.
For example, in ``/one two three`` the qualified name would be
``one two three``.
"""
parent = self.full_parent_name
if parent:
return f"{parent} {self.name}"
else:
return self.name
@property
def qualified_id(self) -> int:
"""Retrieves the fully qualified command ID.
This is the root parent ID. For example, in ``/one two three``
the qualified ID would return ``one.id``.
"""
if self.id is None:
return self.parent.qualified_id
return self.id
def to_dict(self) -> dict[str, Any]:
raise NotImplementedError
def __str__(self) -> str:
return self.qualified_name
def _set_cog(self, cog):
self.cog = cog
[docs]
class SlashCommand(ApplicationCommand):
r"""A class that implements the protocol for a slash command.
These are not created manually, instead they are created via the
decorator or functional interface.
.. versionadded:: 2.0
Attributes
-----------
name: :class:`str`
The name of the command.
callback: :ref:`coroutine <coroutine>`
The coroutine that is executed when the command is called.
description: Optional[:class:`str`]
The description for the command.
guild_ids: Optional[List[:class:`int`]]
The ids of the guilds where this command will be registered.
options: List[:class:`Option`]
The parameters for this command.
parent: Optional[:class:`SlashCommandGroup`]
The parent group that this command belongs to. ``None`` if there
isn't one.
mention: :class:`str`
Returns a string that allows you to mention the slash command.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
.. deprecated:: 2.6
Use the :attr:`contexts` parameter instead.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
cog: Optional[:class:`Cog`]
The cog that this command belongs to. ``None`` if there isn't one.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
cooldown: Optional[:class:`~discord.ext.commands.Cooldown`]
The cooldown applied when the command is invoked. ``None`` if the command
doesn't have a cooldown.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
description_localizations: Dict[:class:`str`, :class:`str`]
The description localizations for this command. The values of this should be ``"locale": "description"``.
See `here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
integration_types: Set[:class:`IntegrationType`]
The type of installation this command should be available to. For instance, if set to
:attr:`IntegrationType.user_install`, the command will only be available to users with
the application installed on their account. Unapplicable for guild commands.
contexts: Set[:class:`InteractionContextType`]
The location where this command can be used. Cannot be set if this is a guild command.
"""
type = 1
def __new__(cls, *args, **kwargs) -> SlashCommand:
self = super().__new__(cls)
self.__original_kwargs__ = kwargs.copy()
return self
def __init__(self, func: Callable, *args, **kwargs) -> None:
super().__init__(func, **kwargs)
if not asyncio.iscoroutinefunction(func):
raise TypeError("Callback must be a coroutine.")
self.callback = func
self.name_localizations: dict[str, str] = kwargs.get(
"name_localizations", MISSING
)
_validate_names(self)
description = kwargs.get("description") or (
inspect.cleandoc(func.__doc__).splitlines()[0]
if func.__doc__ is not None
else "No description provided"
)
self.description: str = description
self.description_localizations: dict[str, str] = kwargs.get(
"description_localizations", MISSING
)
_validate_descriptions(self)
self.attached_to_group: bool = False
self._options_kwargs = kwargs.get("options", [])
self.options: list[Option] = []
self._validate_parameters()
try:
checks = func.__commands_checks__
checks.reverse()
except AttributeError:
checks = kwargs.get("checks", [])
self.checks = checks
self._before_invoke = None
self._after_invoke = None
def _validate_parameters(self):
params = self._get_signature_parameters()
if kwop := self._options_kwargs:
self.options = self._match_option_param_names(params, kwop)
else:
self.options = self._parse_options(params)
def _check_required_params(self, params):
params = iter(params.items())
required_params = (
["self", "context"] if self.attached_to_group or self.cog else ["context"]
)
for p in required_params:
try:
next(params)
except StopIteration:
raise ClientException(
f'Callback for {self.name} command is missing "{p}" parameter.'
)
return params
def _parse_options(self, params, *, check_params: bool = True) -> list[Option]:
if check_params:
params = self._check_required_params(params)
else:
params = iter(params.items())
final_options = []
for p_name, p_obj in params:
option = p_obj.annotation
if option == inspect.Parameter.empty:
option = str
if self._is_typing_annotated(option):
type_hint = get_args(option)[0]
metadata = option.__metadata__
# If multiple Options in metadata, the first will be used.
option_gen = (elem for elem in metadata if isinstance(elem, Option))
option = next(option_gen, Option())
# Handle Optional
if self._is_typing_optional(type_hint):
option.input_type = SlashCommandOptionType.from_datatype(
get_args(type_hint)[0]
)
option.default = None
else:
option.input_type = SlashCommandOptionType.from_datatype(type_hint)
if self._is_typing_union(option):
if self._is_typing_optional(option):
option = Option(option.__args__[0], default=None)
else:
option = Option(option.__args__)
if not isinstance(option, Option):
if isinstance(p_obj.default, Option):
if p_obj.default.input_type is None:
p_obj.default.input_type = SlashCommandOptionType.from_datatype(
option
)
option = p_obj.default
else:
option = Option(option)
if option.default is None and not p_obj.default == inspect.Parameter.empty:
if isinstance(p_obj.default, Option):
pass
elif isinstance(p_obj.default, type) and issubclass(
p_obj.default, (DiscordEnum, Enum)
):
option = Option(p_obj.default)
else:
option.default = p_obj.default
option.required = False
if option.name is None:
option.name = p_name
if option.name != p_name or option._parameter_name is None:
option._parameter_name = p_name
_validate_names(option)
_validate_descriptions(option)
final_options.append(option)
return final_options
def _match_option_param_names(self, params, options):
options = list(options)
params = self._check_required_params(params)
check_annotations: list[Callable[[Option, type], bool]] = [
lambda o, a: o.input_type == SlashCommandOptionType.string
and o.converter is not None, # pass on converters
lambda o, a: isinstance(
o.input_type, SlashCommandOptionType
), # pass on slash cmd option type enums
lambda o, a: isinstance(o._raw_type, tuple) and a == Union[o._raw_type], # type: ignore # union types
lambda o, a: self._is_typing_optional(a)
and not o.required
and o._raw_type in a.__args__, # optional
lambda o, a: isinstance(a, type)
and issubclass(a, o._raw_type), # 'normal' types
]
for o in options:
_validate_names(o)
_validate_descriptions(o)
try:
p_name, p_obj = next(params)
except StopIteration: # not enough params for all the options
raise ClientException("Too many arguments passed to the options kwarg.")
p_obj = p_obj.annotation
if not any(check(o, p_obj) for check in check_annotations):
raise TypeError(
f"Parameter {p_name} does not match input type of {o.name}."
)
o._parameter_name = p_name
left_out_params = OrderedDict()
for k, v in params:
left_out_params[k] = v
options.extend(self._parse_options(left_out_params, check_params=False))
return options
def _is_typing_union(self, annotation):
return getattr(annotation, "__origin__", None) is Union or type(
annotation
) is getattr(
types, "UnionType", Union
) # type: ignore
def _is_typing_optional(self, annotation):
return self._is_typing_union(annotation) and type(None) in annotation.__args__ # type: ignore
def _is_typing_annotated(self, annotation):
return get_origin(annotation) is Annotated
@property
def cog(self):
return getattr(self, "_cog", None)
@cog.setter
def cog(self, value):
old_cog = self.cog
self._cog = value
if (
old_cog is None
and value is not None
or value is None
and old_cog is not None
):
self._validate_parameters()
@property
def is_subcommand(self) -> bool:
return self.parent is not None
@property
def mention(self) -> str:
return f"</{self.qualified_name}:{self.qualified_id}>"
def to_dict(self) -> dict:
as_dict = {
"name": self.name,
"description": self.description,
"options": [o.to_dict() for o in self.options],
}
if self.name_localizations is not MISSING:
as_dict["name_localizations"] = self.name_localizations
if self.description_localizations is not MISSING:
as_dict["description_localizations"] = self.description_localizations
if self.is_subcommand:
as_dict["type"] = SlashCommandOptionType.sub_command.value
if self.nsfw is not None:
as_dict["nsfw"] = self.nsfw
if self.default_member_permissions is not None:
as_dict["default_member_permissions"] = (
self.default_member_permissions.value
)
if not self.guild_ids and not self.is_subcommand:
as_dict["integration_types"] = [it.value for it in self.integration_types]
as_dict["contexts"] = [ctx.value for ctx in self.contexts]
return as_dict
async def _invoke(self, ctx: ApplicationContext) -> None:
# TODO: Parse the args better
kwargs = {}
for arg in ctx.interaction.data.get("options", []):
op = find(lambda x: x.name == arg["name"], self.options)
if op is None:
continue
arg = arg["value"]
# Checks if input_type is user, role or channel
if op.input_type in (
SlashCommandOptionType.user,
SlashCommandOptionType.role,
SlashCommandOptionType.channel,
SlashCommandOptionType.attachment,
SlashCommandOptionType.mentionable,
):
resolved = ctx.interaction.data.get("resolved", {})
if (
op.input_type
in (SlashCommandOptionType.user, SlashCommandOptionType.mentionable)
and (_data := resolved.get("members", {}).get(arg)) is not None
):
# The option type is a user, we resolved a member from the snowflake and assigned it to _data
if (_user_data := resolved.get("users", {}).get(arg)) is not None:
# We resolved the user from the user id
_data["user"] = _user_data
cache_flag = ctx.interaction._state.member_cache_flags.interaction
arg = ctx.guild._get_and_update_member(_data, int(arg), cache_flag)
elif op.input_type is SlashCommandOptionType.mentionable:
if (_data := resolved.get("users", {}).get(arg)) is not None:
arg = User(state=ctx.interaction._state, data=_data)
elif (_data := resolved.get("roles", {}).get(arg)) is not None:
arg = Role(
state=ctx.interaction._state, data=_data, guild=ctx.guild
)
else:
arg = Object(id=int(arg))
elif (
_data := resolved.get(f"{op.input_type.name}s", {}).get(arg)
) is not None:
if op.input_type is SlashCommandOptionType.channel and (
int(arg) in ctx.guild._channels
or int(arg) in ctx.guild._threads
):
arg = ctx.guild.get_channel_or_thread(int(arg))
_data["_invoke_flag"] = True
(
arg._update(_data)
if isinstance(arg, Thread)
else arg._update(ctx.guild, _data)
)
else:
obj_type = None
kw = {}
if op.input_type is SlashCommandOptionType.user:
obj_type = User
elif op.input_type is SlashCommandOptionType.role:
obj_type = Role
kw["guild"] = ctx.guild
elif op.input_type is SlashCommandOptionType.channel:
# NOTE:
# This is a fallback in case the channel/thread is not found in the
# guild's channels/threads. For channels, if this fallback occurs, at the very minimum,
# permissions will be incorrect due to a lack of permission_overwrite data.
# For threads, if this fallback occurs, info like thread owner id, message count,
# flags, and more will be missing due to a lack of data sent by Discord.
obj_type = _threaded_guild_channel_factory(_data["type"])[0]
kw["guild"] = ctx.guild
elif op.input_type is SlashCommandOptionType.attachment:
obj_type = Attachment
arg = obj_type(state=ctx.interaction._state, data=_data, **kw)
else:
# We couldn't resolve the object, so we just return an empty object
arg = Object(id=int(arg))
elif (
op.input_type == SlashCommandOptionType.string
and (converter := op.converter) is not None
):
from discord.ext.commands import Converter
if isinstance(converter, Converter):
if isinstance(converter, type):
arg = await converter().convert(ctx, arg)
else:
arg = await converter.convert(ctx, arg)
elif op._raw_type in (
SlashCommandOptionType.integer,
SlashCommandOptionType.number,
SlashCommandOptionType.string,
SlashCommandOptionType.boolean,
):
pass
elif issubclass(op._raw_type, Enum):
if isinstance(arg, str) and arg.isdigit():
try:
arg = op._raw_type(int(arg))
except ValueError:
arg = op._raw_type(arg)
elif choice := find(lambda c: c.value == arg, op.choices):
arg = getattr(op._raw_type, choice.name)
kwargs[op._parameter_name] = arg
for o in self.options:
if o._parameter_name not in kwargs:
kwargs[o._parameter_name] = o.default
if self.cog is not None:
await self.callback(self.cog, ctx, **kwargs)
elif self.parent is not None and self.attached_to_group is True:
await self.callback(self.parent, ctx, **kwargs)
else:
await self.callback(ctx, **kwargs)
async def invoke_autocomplete_callback(self, ctx: AutocompleteContext):
values = {i.name: i.default for i in self.options}
for op in ctx.interaction.data.get("options", []):
if op.get("focused", False):
option = find(lambda o: o.name == op["name"], self.options)
values.update(
{i["name"]: i["value"] for i in ctx.interaction.data["options"]}
)
ctx.command = self
ctx.focused = option
ctx.value = op.get("value")
ctx.options = values
if len(inspect.signature(option.autocomplete).parameters) == 2:
instance = getattr(option.autocomplete, "__self__", ctx.cog)
result = option.autocomplete(instance, ctx)
else:
result = option.autocomplete(ctx)
if asyncio.iscoroutinefunction(option.autocomplete):
result = await result
choices = [
o if isinstance(o, OptionChoice) else OptionChoice(o)
for o in result
][:25]
return await ctx.interaction.response.send_autocomplete_result(
choices=choices
)
[docs]
def copy(self):
"""Creates a copy of this command.
Returns
-------
:class:`SlashCommand`
A new instance of this command.
"""
ret = self.__class__(self.callback, **self.__original_kwargs__)
return self._ensure_assignment_on_copy(ret)
def _ensure_assignment_on_copy(self, other):
other._before_invoke = self._before_invoke
other._after_invoke = self._after_invoke
if self.checks != other.checks:
other.checks = self.checks.copy()
# if self._buckets.valid and not other._buckets.valid:
# other._buckets = self._buckets.copy()
# if self._max_concurrency != other._max_concurrency:
# # _max_concurrency won't be None at this point
# other._max_concurrency = self._max_concurrency.copy() # type: ignore
try:
other.on_error = self.on_error
except AttributeError:
pass
return other
def _update_copy(self, kwargs: dict[str, Any]):
if kwargs:
kw = kwargs.copy()
kw.update(self.__original_kwargs__)
copy = self.__class__(self.callback, **kw)
return self._ensure_assignment_on_copy(copy)
else:
return self.copy()
[docs]
class SlashCommandGroup(ApplicationCommand):
r"""A class that implements the protocol for a slash command group.
These can be created manually, but they should be created via the
decorator or functional interface.
Attributes
-----------
name: :class:`str`
The name of the command.
description: Optional[:class:`str`]
The description for the command.
guild_ids: Optional[List[:class:`int`]]
The ids of the guilds where this command will be registered.
parent: Optional[:class:`SlashCommandGroup`]
The parent group that this group belongs to. ``None`` if there
isn't one.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
.. deprecated:: 2.6
Use the :attr:`contexts` parameter instead.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
description_localizations: Dict[:class:`str`, :class:`str`]
The description localizations for this command. The values of this should be ``"locale": "description"``.
See `here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
integration_types: Set[:class:`IntegrationType`]
The type of installation this command should be available to. For instance, if set to
:attr:`IntegrationType.user_install`, the command will only be available to users with
the application installed on their account. Unapplicable for guild commands.
contexts: Set[:class:`InteractionContextType`]
The location where this command can be used. Unapplicable for guild commands.
"""
__initial_commands__: list[SlashCommand | SlashCommandGroup]
type = 1
def __new__(cls, *args, **kwargs) -> SlashCommandGroup:
self = super().__new__(cls)
self.__original_kwargs__ = kwargs.copy()
self.__initial_commands__ = []
for i, c in cls.__dict__.items():
if isinstance(c, type) and SlashCommandGroup in c.__bases__:
c = c(
c.__name__,
(
inspect.cleandoc(cls.__doc__).splitlines()[0]
if cls.__doc__ is not None
else "No description provided"
),
)
if isinstance(c, (SlashCommand, SlashCommandGroup)):
c.parent = self
c.attached_to_group = True
self.__initial_commands__.append(c)
return self
def __init__(
self,
name: str,
description: str | None = None,
guild_ids: list[int] | None = None,
parent: SlashCommandGroup | None = None,
cooldown: CooldownMapping | None = None,
max_concurrency: MaxConcurrency | None = None,
**kwargs,
) -> None:
self.name = str(name)
self.description = description or "No description provided"
validate_chat_input_name(self.name)
validate_chat_input_description(self.description)
self.input_type = SlashCommandOptionType.sub_command_group
self.subcommands: list[SlashCommand | SlashCommandGroup] = (
self.__initial_commands__
)
self.guild_ids = guild_ids
self.parent = parent
self.attached_to_group: bool = False
self.checks = kwargs.get("checks", [])
self._before_invoke = None
self._after_invoke = None
self.cog = None
self.id = None
# Permissions
self.default_member_permissions: Permissions | None = kwargs.get(
"default_member_permissions", None
)
self.nsfw: bool | None = kwargs.get("nsfw", None)
integration_types = kwargs.get("integration_types", None)
contexts = kwargs.get("contexts", None)
guild_only = kwargs.get("guild_only", MISSING)
if guild_only is not MISSING:
warn_deprecated("guild_only", "contexts", "2.6")
if contexts and guild_only:
raise InvalidArgument(
"cannot pass both 'contexts' and 'guild_only' to ApplicationCommand"
)
if self.guild_ids and (
(contexts is not None) or guild_only or integration_types
):
raise InvalidArgument(
"the 'contexts' and 'integration_types' parameters are not available for guild commands"
)
# These are set to None and their defaults are then set when added to the bot
self.contexts: set[InteractionContextType] | None = contexts
if guild_only:
self.guild_only: bool | None = guild_only
self.integration_types: set[IntegrationType] | None = integration_types
self.name_localizations: dict[str, str] = kwargs.get(
"name_localizations", MISSING
)
self.description_localizations: dict[str, str] = kwargs.get(
"description_localizations", MISSING
)
# similar to ApplicationCommand
from ..ext.commands.cooldowns import BucketType, CooldownMapping, MaxConcurrency
# no need to getattr, since slash cmds groups cant be created using a decorator
if cooldown is None:
buckets = CooldownMapping(cooldown, BucketType.default)
elif isinstance(cooldown, CooldownMapping):
buckets = cooldown
else:
raise TypeError(
"Cooldown must be a an instance of CooldownMapping or None."
)
self._buckets: CooldownMapping = buckets
# no need to getattr, since slash cmds groups cant be created using a decorator
if max_concurrency is not None and not isinstance(
max_concurrency, MaxConcurrency
):
raise TypeError(
"max_concurrency must be an instance of MaxConcurrency or None"
)
self._max_concurrency: MaxConcurrency | None = max_concurrency
@property
def module(self) -> str | None:
return self.__module__
@property
def guild_only(self) -> bool:
warn_deprecated("guild_only", "contexts", "2.6")
return InteractionContextType.guild in self.contexts and len(self.contexts) == 1
@guild_only.setter
def guild_only(self, value: bool) -> None:
warn_deprecated("guild_only", "contexts", "2.6")
if value:
self.contexts = {InteractionContextType.guild}
else:
self.contexts = {
InteractionContextType.guild,
InteractionContextType.bot_dm,
InteractionContextType.private_channel,
}
def to_dict(self) -> dict:
as_dict = {
"name": self.name,
"description": self.description,
"options": [c.to_dict() for c in self.subcommands],
}
if self.name_localizations is not MISSING:
as_dict["name_localizations"] = self.name_localizations
if self.description_localizations is not MISSING:
as_dict["description_localizations"] = self.description_localizations
if self.parent is not None:
as_dict["type"] = self.input_type.value
if self.nsfw is not None:
as_dict["nsfw"] = self.nsfw
if self.default_member_permissions is not None:
as_dict["default_member_permissions"] = (
self.default_member_permissions.value
)
if not self.guild_ids and self.parent is None:
as_dict["integration_types"] = [it.value for it in self.integration_types]
as_dict["contexts"] = [ctx.value for ctx in self.contexts]
return as_dict
def add_command(self, command: SlashCommand | SlashCommandGroup) -> None:
if command.cog is None and self.cog is not None:
command.cog = self.cog
self.subcommands.append(command)
def command(
self, cls: type[T] = SlashCommand, **kwargs
) -> Callable[[Callable], SlashCommand]:
def wrap(func) -> T:
command = cls(func, parent=self, **kwargs)
self.add_command(command)
return command
return wrap
[docs]
def create_subgroup(
self,
name: str,
description: str | None = None,
guild_ids: list[int] | None = None,
**kwargs,
) -> SlashCommandGroup:
"""
Creates a new subgroup for this SlashCommandGroup.
Parameters
----------
name: :class:`str`
The name of the group to create.
description: Optional[:class:`str`]
The description of the group to create.
guild_ids: Optional[List[:class:`int`]]
A list of the IDs of each guild this group should be added to, making it a guild command.
This will be a global command if ``None`` is passed.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
description_localizations: Dict[:class:`str`, :class:`str`]
The description localizations for this command. The values of this should be ``"locale": "description"``.
See `here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
Returns
-------
SlashCommandGroup
The slash command group that was created.
"""
if self.parent is not None:
raise Exception("A subcommand group cannot be added to a subcommand group")
sub_command_group = SlashCommandGroup(
name, description, guild_ids, parent=self, **kwargs
)
self.subcommands.append(sub_command_group)
return sub_command_group
[docs]
def subgroup(
self,
name: str | None = None,
description: str | None = None,
guild_ids: list[int] | None = None,
) -> Callable[[type[SlashCommandGroup]], SlashCommandGroup]:
"""A shortcut decorator that initializes the provided subclass of :class:`.SlashCommandGroup`
as a subgroup.
.. versionadded:: 2.0
Parameters
----------
name: Optional[:class:`str`]
The name of the group to create. This will resolve to the name of the decorated class if ``None`` is passed.
description: Optional[:class:`str`]
The description of the group to create.
guild_ids: Optional[List[:class:`int`]]
A list of the IDs of each guild this group should be added to, making it a guild command.
This will be a global command if ``None`` is passed.
Returns
-------
Callable[[Type[SlashCommandGroup]], SlashCommandGroup]
The slash command group that was created.
"""
def inner(cls: type[SlashCommandGroup]) -> SlashCommandGroup:
group = cls(
name or cls.__name__,
description
or (
inspect.cleandoc(cls.__doc__).splitlines()[0]
if cls.__doc__ is not None
else "No description provided"
),
guild_ids=guild_ids,
parent=self,
)
self.add_command(group)
return group
return inner
async def _invoke(self, ctx: ApplicationContext) -> None:
option = ctx.interaction.data["options"][0]
resolved = ctx.interaction.data.get("resolved", None)
command = find(lambda x: x.name == option["name"], self.subcommands)
option["resolved"] = resolved
ctx.interaction.data = option
await command.invoke(ctx)
async def invoke_autocomplete_callback(self, ctx: AutocompleteContext) -> None:
option = ctx.interaction.data["options"][0]
command = find(lambda x: x.name == option["name"], self.subcommands)
ctx.interaction.data = option
await command.invoke_autocomplete_callback(ctx)
async def call_before_hooks(self, ctx: ApplicationContext) -> None:
# only call local hooks
cog = self.cog
if self._before_invoke is not None:
# should be cog if @commands.before_invoke is used
instance = getattr(self._before_invoke, "__self__", cog)
# __self__ only exists for methods, not functions
# however, if @command.before_invoke is used, it will be a function
if instance:
await self._before_invoke(instance, ctx) # type: ignore
else:
await self._before_invoke(ctx) # type: ignore
async def call_after_hooks(self, ctx: ApplicationContext) -> None:
cog = self.cog
if self._after_invoke is not None:
instance = getattr(self._after_invoke, "__self__", cog)
if instance:
await self._after_invoke(instance, ctx) # type: ignore
else:
await self._after_invoke(ctx) # type: ignore
[docs]
def walk_commands(self) -> Generator[SlashCommand | SlashCommandGroup]:
"""An iterator that recursively walks through all slash commands and groups in this group.
Yields
------
:class:`.SlashCommand` | :class:`.SlashCommandGroup`
A nested slash command or slash command group from the group.
"""
for command in self.subcommands:
if isinstance(command, SlashCommandGroup):
yield from command.walk_commands()
yield command
[docs]
def copy(self):
"""Creates a copy of this command group.
Returns
-------
:class:`SlashCommandGroup`
A new instance of this command group.
"""
ret = self.__class__(
name=self.name,
description=self.description,
**{
param: value
for param, value in self.__original_kwargs__.items()
if param not in ("name", "description")
},
)
return self._ensure_assignment_on_copy(ret)
def _ensure_assignment_on_copy(self, other):
other.parent = self.parent
other._before_invoke = self._before_invoke
other._after_invoke = self._after_invoke
if self.subcommands != other.subcommands:
other.subcommands = self.subcommands.copy()
if self.checks != other.checks:
other.checks = self.checks.copy()
return other
def _update_copy(self, kwargs: dict[str, Any]):
if kwargs:
kw = kwargs.copy()
kw.update(self.__original_kwargs__)
copy = self.__class__(**kw)
return self._ensure_assignment_on_copy(copy)
else:
return self.copy()
def _set_cog(self, cog):
super()._set_cog(cog)
for subcommand in self.subcommands:
subcommand._set_cog(cog)
class ContextMenuCommand(ApplicationCommand):
r"""A class that implements the protocol for context menu commands.
These are not created manually, instead they are created via the
decorator or functional interface.
.. versionadded:: 2.0
Attributes
-----------
name: :class:`str`
The name of the command.
callback: :ref:`coroutine <coroutine>`
The coroutine that is executed when the command is called.
guild_ids: Optional[List[:class:`int`]]
The ids of the guilds where this command will be registered.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
.. deprecated:: 2.6
Use the ``contexts`` parameter instead.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
cog: Optional[:class:`Cog`]
The cog that this command belongs to. ``None`` if there isn't one.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
cooldown: Optional[:class:`~discord.ext.commands.Cooldown`]
The cooldown applied when the command is invoked. ``None`` if the command
doesn't have a cooldown.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
integration_types: Set[:class:`IntegrationType`]
The installation contexts where this command is available. Unapplicable for guild commands.
contexts: Set[:class:`InteractionContextType`]
The interaction contexts where this command is available. Unapplicable for guild commands.
"""
def __new__(cls, *args, **kwargs) -> ContextMenuCommand:
self = super().__new__(cls)
self.__original_kwargs__ = kwargs.copy()
return self
def __init__(self, func: Callable, *args, **kwargs) -> None:
super().__init__(func, **kwargs)
if not asyncio.iscoroutinefunction(func):
raise TypeError("Callback must be a coroutine.")
self.callback = func
self.name_localizations: dict[str, str] = kwargs.get(
"name_localizations", MISSING
)
# Discord API doesn't support setting descriptions for context menu commands, so it must be empty
self.description = ""
if not isinstance(self.name, str):
raise TypeError("Name of a command must be a string.")
self.cog = None
self.id = None
self._before_invoke = None
self._after_invoke = None
self.validate_parameters()
# Context Menu commands can't have parents
self.parent = None
def validate_parameters(self):
params = self._get_signature_parameters()
if list(params.items())[0][0] == "self":
temp = list(params.items())
temp.pop(0)
params = dict(temp)
params = iter(params)
# next we have the 'ctx' as the next parameter
try:
next(params)
except StopIteration:
raise ClientException(
f'Callback for {self.name} command is missing "ctx" parameter.'
)
# next we have the 'user/message' as the next parameter
try:
next(params)
except StopIteration:
cmd = "user" if type(self) == UserCommand else "message"
raise ClientException(
f'Callback for {self.name} command is missing "{cmd}" parameter.'
)
# next there should be no more parameters
try:
next(params)
raise ClientException(
f"Callback for {self.name} command has too many parameters."
)
except StopIteration:
pass
@property
def qualified_name(self):
return self.name
def to_dict(self) -> dict[str, str | int]:
as_dict = {
"name": self.name,
"description": self.description,
"type": self.type,
}
if not self.guild_ids:
as_dict["integration_types"] = [it.value for it in self.integration_types]
as_dict["contexts"] = [ctx.value for ctx in self.contexts]
if self.nsfw is not None:
as_dict["nsfw"] = self.nsfw
if self.default_member_permissions is not None:
as_dict["default_member_permissions"] = (
self.default_member_permissions.value
)
if self.name_localizations:
as_dict["name_localizations"] = self.name_localizations
return as_dict
[docs]
class UserCommand(ContextMenuCommand):
r"""A class that implements the protocol for user context menu commands.
These are not created manually, instead they are created via the
decorator or functional interface.
Attributes
-----------
name: :class:`str`
The name of the command.
callback: :ref:`coroutine <coroutine>`
The coroutine that is executed when the command is called.
guild_ids: Optional[List[:class:`int`]]
The ids of the guilds where this command will be registered.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
.. deprecated:: 2.6
Use the ``contexts`` parameter instead.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
cog: Optional[:class:`Cog`]
The cog that this command belongs to. ``None`` if there isn't one.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
cooldown: Optional[:class:`~discord.ext.commands.Cooldown`]
The cooldown applied when the command is invoked. ``None`` if the command
doesn't have a cooldown.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
integration_types: Set[:class:`IntegrationType`]
The installation contexts where this command is available. Unapplicable for guild commands.
contexts: Set[:class:`InteractionContextType`]
The interaction contexts where this command is available. Unapplicable for guild commands.
"""
type = 2
def __new__(cls, *args, **kwargs) -> UserCommand:
self = super().__new__(cls)
self.__original_kwargs__ = kwargs.copy()
return self
async def _invoke(self, ctx: ApplicationContext) -> None:
if "members" not in ctx.interaction.data["resolved"]:
_data = ctx.interaction.data["resolved"]["users"]
for i, v in _data.items():
v["id"] = int(i)
user = v
target = User(state=ctx.interaction._state, data=user)
else:
_data = ctx.interaction.data["resolved"]["members"]
for i, v in _data.items():
v["id"] = int(i)
member = v
_data = ctx.interaction.data["resolved"]["users"]
for i, v in _data.items():
v["id"] = int(i)
user = v
member["user"] = user
cache_flag = ctx.interaction._state.member_cache_flags.interaction
target = ctx.guild._get_and_update_member(member, user["id"], cache_flag)
if self.cog is not None:
await self.callback(self.cog, ctx, target)
else:
await self.callback(ctx, target)
[docs]
def copy(self):
"""Creates a copy of this command.
Returns
-------
:class:`UserCommand`
A new instance of this command.
"""
ret = self.__class__(self.callback, **self.__original_kwargs__)
return self._ensure_assignment_on_copy(ret)
def _ensure_assignment_on_copy(self, other):
other._before_invoke = self._before_invoke
other._after_invoke = self._after_invoke
if self.checks != other.checks:
other.checks = self.checks.copy()
# if self._buckets.valid and not other._buckets.valid:
# other._buckets = self._buckets.copy()
# if self._max_concurrency != other._max_concurrency:
# # _max_concurrency won't be None at this point
# other._max_concurrency = self._max_concurrency.copy() # type: ignore
try:
other.on_error = self.on_error
except AttributeError:
pass
return other
def _update_copy(self, kwargs: dict[str, Any]):
if kwargs:
kw = kwargs.copy()
kw.update(self.__original_kwargs__)
copy = self.__class__(self.callback, **kw)
return self._ensure_assignment_on_copy(copy)
else:
return self.copy()
[docs]
class MessageCommand(ContextMenuCommand):
r"""A class that implements the protocol for message context menu commands.
These are not created manually, instead they are created via the
decorator or functional interface.
Attributes
-----------
name: :class:`str`
The name of the command.
callback: :ref:`coroutine <coroutine>`
The coroutine that is executed when the command is called.
guild_ids: Optional[List[:class:`int`]]
The ids of the guilds where this command will be registered.
guild_only: :class:`bool`
Whether the command should only be usable inside a guild.
.. deprecated:: 2.6
Use the ``contexts`` parameter instead.
nsfw: :class:`bool`
Whether the command should be restricted to 18+ channels and users.
Apps intending to be listed in the App Directory cannot have NSFW commands.
default_member_permissions: :class:`~discord.Permissions`
The default permissions a member needs to be able to run the command.
cog: Optional[:class:`Cog`]
The cog that this command belongs to. ``None`` if there isn't one.
checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]]
A list of predicates that verifies if the command could be executed
with the given :class:`.ApplicationContext` as the sole parameter. If an exception
is necessary to be thrown to signal failure, then one inherited from
:exc:`.ApplicationCommandError` should be used. Note that if the checks fail then
:exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error`
event.
cooldown: Optional[:class:`~discord.ext.commands.Cooldown`]
The cooldown applied when the command is invoked. ``None`` if the command
doesn't have a cooldown.
name_localizations: Dict[:class:`str`, :class:`str`]
The name localizations for this command. The values of this should be ``"locale": "name"``. See
`here <https://discord.com/developers/docs/reference#locales>`_ for a list of valid locales.
integration_types: Set[:class:`IntegrationType`]
The installation contexts where this command is available. Unapplicable for guild commands.
contexts: Set[:class:`InteractionContextType`]
The interaction contexts where this command is available. Unapplicable for guild commands.
"""
type = 3
def __new__(cls, *args, **kwargs) -> MessageCommand:
self = super().__new__(cls)
self.__original_kwargs__ = kwargs.copy()
return self
async def _invoke(self, ctx: ApplicationContext):
_data = ctx.interaction.data["resolved"]["messages"]
for i, v in _data.items():
v["id"] = int(i)
message = v
channel = ctx.interaction.channel
if channel.id != int(message["channel_id"]):
# we got weird stuff going on, make up a channel
channel = PartialMessageable(
state=ctx.interaction._state, id=int(message["channel_id"])
)
target = Message(state=ctx.interaction._state, channel=channel, data=message)
if self.cog is not None:
await self.callback(self.cog, ctx, target)
else:
await self.callback(ctx, target)
[docs]
def copy(self):
"""Creates a copy of this command.
Returns
-------
:class:`MessageCommand`
A new instance of this command.
"""
ret = self.__class__(self.callback, **self.__original_kwargs__)
return self._ensure_assignment_on_copy(ret)
def _ensure_assignment_on_copy(self, other):
other._before_invoke = self._before_invoke
other._after_invoke = self._after_invoke
if self.checks != other.checks:
other.checks = self.checks.copy()
# if self._buckets.valid and not other._buckets.valid:
# other._buckets = self._buckets.copy()
# if self._max_concurrency != other._max_concurrency:
# # _max_concurrency won't be None at this point
# other._max_concurrency = self._max_concurrency.copy() # type: ignore
try:
other.on_error = self.on_error
except AttributeError:
pass
return other
def _update_copy(self, kwargs: dict[str, Any]):
if kwargs:
kw = kwargs.copy()
kw.update(self.__original_kwargs__)
copy = self.__class__(self.callback, **kw)
return self._ensure_assignment_on_copy(copy)
else:
return self.copy()
[docs]
def slash_command(**kwargs):
"""Decorator for slash commands that invokes :func:`application_command`.
.. versionadded:: 2.0
Returns
-------
Callable[..., :class:`.SlashCommand`]
A decorator that converts the provided method into a :class:`.SlashCommand`.
"""
return application_command(cls=SlashCommand, **kwargs)
[docs]
def user_command(**kwargs):
"""Decorator for user commands that invokes :func:`application_command`.
.. versionadded:: 2.0
Returns
-------
Callable[..., :class:`.UserCommand`]
A decorator that converts the provided method into a :class:`.UserCommand`.
"""
return application_command(cls=UserCommand, **kwargs)
[docs]
def message_command(**kwargs):
"""Decorator for message commands that invokes :func:`application_command`.
.. versionadded:: 2.0
Returns
-------
Callable[..., :class:`.MessageCommand`]
A decorator that converts the provided method into a :class:`.MessageCommand`.
"""
return application_command(cls=MessageCommand, **kwargs)
[docs]
def application_command(cls=SlashCommand, **attrs):
"""A decorator that transforms a function into an :class:`.ApplicationCommand`. More specifically,
usually one of :class:`.SlashCommand`, :class:`.UserCommand`, or :class:`.MessageCommand`. The exact class
depends on the ``cls`` parameter.
By default, the ``description`` attribute is received automatically from the
docstring of the function and is cleaned up with the use of
``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded
into :class:`str` using utf-8 encoding.
The ``name`` attribute also defaults to the function name unchanged.
.. versionadded:: 2.0
Parameters
----------
cls: :class:`.ApplicationCommand`
The class to construct with. By default, this is :class:`.SlashCommand`.
You usually do not change this.
attrs
Keyword arguments to pass into the construction of the class denoted
by ``cls``.
Returns
-------
Callable[..., :class:`.ApplicationCommand`]
A decorator that converts the provided method into an :class:`.ApplicationCommand`, or subclass of it.
Raises
------
TypeError
If the function is not a coroutine or is already a command.
"""
def decorator(func: Callable) -> cls:
if isinstance(func, ApplicationCommand):
func = func.callback
elif not callable(func):
raise TypeError(
"func needs to be a callable or a subclass of ApplicationCommand."
)
return cls(func, **attrs)
return decorator
[docs]
def command(**kwargs):
"""An alias for :meth:`application_command`.
.. note::
This decorator is overridden by :func:`ext.commands.command`.
.. versionadded:: 2.0
Returns
-------
Callable[..., :class:`.ApplicationCommand`]
A decorator that converts the provided method into an :class:`.ApplicationCommand`.
"""
return application_command(**kwargs)
docs = "https://discord.com/developers/docs"
valid_locales = [
"da",
"de",
"en-GB",
"en-US",
"es-ES",
"fr",
"hr",
"it",
"lt",
"hu",
"nl",
"no",
"pl",
"pt-BR",
"ro",
"fi",
"sv-SE",
"vi",
"tr",
"cs",
"el",
"bg",
"ru",
"uk",
"hi",
"th",
"zh-CN",
"ja",
"zh-TW",
"ko",
]
# Validation
def validate_chat_input_name(name: Any, locale: str | None = None):
# Must meet the regex ^[-_\w\d\u0901-\u097D\u0E00-\u0E7F]{1,32}$
if locale is not None and locale not in valid_locales:
raise ValidationError(
f"Locale '{locale}' is not a valid locale, see {docs}/reference#locales for"
" list of supported locales."
)
error = None
if not isinstance(name, str):
error = TypeError(
f'Command names and options must be of type str. Received "{name}"'
)
elif not re.match(r"^[-_\w\d\u0901-\u097D\u0E00-\u0E7F]{1,32}$", name):
error = ValidationError(
r"Command names and options must follow the regex"
r" \"^[-_\w\d\u0901-\u097D\u0E00-\u0E7F]{1,32}$\". "
"For more information, see"
f" {docs}/interactions/application-commands#application-command-object-"
f'application-command-naming. Received "{name}"'
)
elif (
name.lower() != name
): # Can't use islower() as it fails if none of the chars can be lowered. See #512.
error = ValidationError(
f'Command names and options must be lowercase. Received "{name}"'
)
if error:
if locale:
error.args = (f"{error.args[0]} in locale {locale}",)
raise error
def validate_chat_input_description(description: Any, locale: str | None = None):
if locale is not None and locale not in valid_locales:
raise ValidationError(
f"Locale '{locale}' is not a valid locale, see {docs}/reference#locales for"
" list of supported locales."
)
error = None
if not isinstance(description, str):
error = TypeError(
"Command and option description must be of type str. Received"
f' "{description}"'
)
elif not 1 <= len(description) <= 100:
error = ValidationError(
"Command and option description must be 1-100 characters long. Received"
f' "{description}"'
)
if error:
if locale:
error.args = (f"{error.args[0]} in locale {locale}",)
raise error