# This file is part of curious.
#
# curious is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# curious is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with curious. If not, see <http://www.gnu.org/licenses/>.
"""
Wrappers for Channel objects.
.. currentmodule:: curious.dataclasses.channel
"""
import collections
import enum
import pathlib
import time
import typing as _typing
from math import floor
from os import PathLike
from types import MappingProxyType
from typing import AsyncIterator
import multio
from async_generator import asynccontextmanager
from curious.dataclasses import guild as dt_guild, invite as dt_invite, member as dt_member, \
message as dt_message, permissions as dt_permissions, role as dt_role, user as dt_user, \
webhook as dt_webhook
from curious.dataclasses.bases import Dataclass, IDObject
from curious.dataclasses.embed import Embed
from curious.exc import CuriousError, ErrorCode, Forbidden, HTTPException, PermissionsError
from curious.util import AsyncIteratorWrapper, base64ify, deprecated, safe_generator
[docs]class ChannelType(enum.IntEnum):
"""
Returns a mapping from Discord channel type.
"""
#: Represents a text channel.
TEXT = 0
#: Represents a private channel.
PRIVATE = 1
#: Represents a voice channel.
VOICE = 2
#: Represents a group channel.
GROUP = 3
#: Represents a category channel.
CATEGORY = 4
def has_messages(self) -> bool:
"""
:return: If this channel type has messages.
"""
return self not in [ChannelType.VOICE, ChannelType.CATEGORY]
[docs]class HistoryIterator(collections.AsyncIterator):
"""
An iterator that allows you to automatically fetch messages and async iterate over them.
.. code-block:: python3
it = HistoryIterator(some_channel, bot, max_messages=100)
# usage 1
async for message in it:
...
# usage 2
await it.fill_messages()
for message in it.messages:
...
Note that usage 2 will only fill chunks of 100 messages at a time.
"""
def __init__(self, channel: 'Channel',
max_messages: int = -1, *,
before: int = None, after: int = None):
"""
:param channel: The :class:`.Channel` to iterate over.
:param max_messages: The maximum number of messages to return. <= 0 means infinite.
:param before: The message ID to fetch before.
:param after: The message ID to fetch after.
.. versionchanged:: 0.7.0
Removed the ``client`` parameter.
"""
self.channel = channel
#: The current storage of messages.
self.messages = collections.deque()
#: The current count of messages iterated over.
#: This is used to know when to automatically fill new messages.
self.current_count = 0
#: The maximum amount of messages to use.
#: If this is <= 0, an infinite amount of messages are returned.
self.max_messages = max_messages
#: The message ID of before to fetch.
self.before = before
if isinstance(self.before, IDObject):
self.before = self.before.id
#: The message ID of after to fetch.
self.after = after
if isinstance(self.after, IDObject):
self.after = self.after.id
#: The last message ID that we fetched.
if self.before:
self.last_message_id = self.before
else:
self.last_message_id = self.after
[docs] async def fill_messages(self) -> None:
"""
Called to fill the next <n> messages.
This is called automatically by :meth:`.__anext__`, but can be used to fill the messages
anyway.
"""
if self.max_messages < 0:
to_get = 100
else:
to_get = self.max_messages - self.current_count
if to_get <= 0:
return
if self.before:
messages = await self.channel._bot.http.get_message_history(self.channel.id,
before=self.last_message_id,
limit=to_get)
else:
messages = await self.channel._bot.http.get_message_history(self.channel.id,
after=self.last_message_id)
messages = reversed(messages)
for message in messages:
self.messages.append(self.channel._bot.state.make_message(message))
async def __anext__(self) -> 'dt_message.Message':
self.current_count += 1
if self.current_count == self.max_messages:
raise StopAsyncIteration
if len(self.messages) <= 0:
await self.fill_messages()
try:
message = self.messages.popleft()
except IndexError:
# No messages to fill, so self._fill_messages didn't return any
# This signals the end of iteration.
raise StopAsyncIteration
self.last_message_id = message.id
return message
def __iter__(self) -> None:
raise RuntimeError("This is not an iterator - you want to use `async for` instead.")
def __await__(self) -> None:
raise RuntimeError("This is not a coroutine - you want to use `async for` instead.")
[docs] async def next(self) -> 'dt_message.Message':
"""
Gets the next item in history.
"""
return await self.__anext__()
[docs] async def all(self) -> '_typing.List[dt_message.Message]':
"""
Gets a flattened list of items from the history.
"""
items = []
async for item in self:
items.append(item)
return items
[docs]class ChannelMessageWrapper(object):
"""
Represents a channel's message container.
"""
__slots__ = "channel",
def __init__(self, channel: 'Channel'):
#: The :class:`.Channel` this container is used for.
self.channel = channel
def __iter__(self) -> None:
raise RuntimeError("Use `async for`")
def __aiter__(self) -> HistoryIterator:
return self.history.__aiter__()
@property
def history(self) -> HistoryIterator:
"""
:return: A :class:`.HistoryIterator` that can be used to iterate over the channel history.
"""
return self.get_history(before=self.channel._last_message_id, limit=-1)
[docs] def get_history(self, before: int = None,
after: int = None,
limit: int = 100) -> HistoryIterator:
"""
Gets history for this channel.
This is *not* an async function - it returns a :class:`HistoryIterator` which can be async
iterated over to get message history.
.. code-block:: python3
async for message in channel.get_history(limit=1000):
print(message.content, "by", message.author.user.name)
:param limit: The maximum number of messages to get.
:param before: The snowflake ID to get messages before.
:param after: The snowflake ID to get messages after.
"""
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).read_message_history:
raise PermissionsError("read_message_history")
return HistoryIterator(self.channel, before=before, after=after, max_messages=limit)
[docs] async def send(self, content: str = None, *,
tts: bool = False, embed: 'Embed' = None) -> 'dt_message.Message':
"""
Sends a message to this channel.
This requires SEND_MESSAGES permission in the channel.
If the content is not a string, it will be automatically stringified.
.. code:: python
await channel.send("Hello, world!")
:param content: The content of the message to send.
:param tts: Should this message be text to speech?
:param embed: An embed object to send with this message.
:return: A new :class:`.Message` object.
"""
if not self.channel.type.has_messages():
raise CuriousError("Cannot send messages to a voice channel")
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).send_messages:
raise PermissionsError("send_messages")
if not isinstance(content, str) and content is not None:
content = str(content)
# check for empty messages
if not content:
if not embed:
raise CuriousError("Cannot send an empty message")
if self.channel.guild and not \
self.channel.permissions(self.channel.guild.me).embed_links:
raise PermissionsError("embed_links")
else:
if content and len(content) > 2000:
raise CuriousError("Content must be less than 2000 characters")
if embed is not None:
embed = embed.to_dict()
data = await self.channel._bot.http.send_message(self.channel.id, content,
tts=tts, embed=embed)
obb = self.channel._bot.state.make_message(data, cache=True)
return obb
[docs] async def upload(self, fp: '_typing.Union[bytes, str, PathLike, _typing.IO]',
*,
filename: str = None,
message_content: '_typing.Optional[str]' = None) -> 'dt_message.Message':
"""
Uploads a message to this channel.
This requires SEND_MESSAGES and ATTACH_FILES permission in the channel.
.. code-block:: python3
with open("/tmp/emilia_best_girl.jpg", 'rb') as f:
await channel.messages.upload(f, "my_waifu.jpg")
:param fp: Variable.
- If passed a string or a :class:`os.PathLike`, will open and read the file and
upload it.
- If passed bytes, will use the bytes as the file content.
- If passed a file-like, will read and use the content to upload.
:param filename: The filename for the file uploaded. If a path-like or str is passed, \
will use the filename from that if this is not specified.
:param message_content: Optional: Any extra content to be sent with the message.
:return: The new :class:`.Message` created.
"""
if not self.channel.type.has_messages():
raise CuriousError("Cannot send messages to a voice channel")
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).send_messages:
raise PermissionsError("send_messages")
if not self.channel.permissions(self.channel.guild.me).attach_files:
raise PermissionsError("attach_files")
if isinstance(fp, bytes):
file_content = fp
elif isinstance(fp, pathlib.Path):
if filename is None:
filename = fp.parts[-1]
file_content = fp.read_bytes()
elif isinstance(fp, (str, PathLike)):
path = pathlib.Path(fp)
if filename is None:
filename = path.parts[-1]
file_content = path.read_bytes()
elif isinstance(fp, _typing.IO) or hasattr(fp, "read"):
file_content = fp.read()
if isinstance(file_content, str):
file_content = file_content.encode("utf-8")
else:
raise ValueError("Got unknown type for upload")
if filename is None:
filename = "unknown.bin"
data = await self.channel._bot.http.send_file(self.channel.id, file_content,
filename=filename, content=message_content)
obb = self.channel._bot.state.make_message(data, cache=False)
return obb
[docs] async def bulk_delete(self, messages: '_typing.List[dt_message.Message]') -> int:
"""
Deletes messages from a channel.
This is the low-level delete function - for the high-level function, see
:meth:`.Channel.messages.purge()`.
Example for deleting all the last 100 messages:
.. code:: python
history = channel.messages.get_history(limit=100)
messages = []
async for message in history:
messages.append(message)
await channel.messages.bulk_delete(messages)
:param messages: A list of :class:`.Message` objects to delete.
:return: The number of messages deleted.
"""
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).manage_messages:
raise PermissionsError("manage_messages")
minimum_allowed = floor((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22
ids = []
for message in messages:
if message.id < minimum_allowed:
msg = f"Cannot delete message id {message.id} older than {minimum_allowed}"
raise CuriousError(msg)
ids.append(message.id)
await self.channel._bot.http.delete_multiple_messages(self.channel.id, ids)
return len(ids)
[docs] async def purge(self, limit: int = 100, *,
author: 'dt_member.Member' = None,
content: str = None,
predicate: '_typing.Callable[[dt_message.Message], bool]' = None,
fallback_from_bulk: bool = False):
"""
Purges messages from a channel.
This will attempt to use ``bulk-delete`` if possible, but otherwise will use the normal
delete endpoint (which can get ratelimited severely!) if ``fallback_from_bulk`` is True.
Example for deleting all messages owned by the bot:
.. code-block:: python3
me = channel.guild.me
await channel.messages.purge(limit=100, author=me)
Custom check functions can also be applied which specify any extra checks. They take one
argument (the Message object) and return a boolean (True or False) determining if the
message should be deleted.
For example, to delete all messages with the letter ``i`` in them:
.. code-block:: python3
await channel.messages.purge(limit=100,
predicate=lambda message: 'i' in message.content)
:param limit: The maximum amount of messages to delete. -1 for unbounded size.
:param author: Only delete messages made by this author.
:param content: Only delete messages that exactly match this content.
:param predicate: A callable that determines if a message should be deleted.
:param fallback_from_bulk: If this is True, messages will be regular deleted if they \
cannot be bulk deleted.
:return: The number of messages deleted.
"""
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).manage_messages \
and not fallback_from_bulk:
raise PermissionsError("manage_messages")
checks = []
if author:
checks.append(lambda m: m.author == author)
if content:
checks.append(lambda m: m.content == content)
if predicate:
checks.append(predicate)
to_delete = []
history = self.get_history(limit=limit)
async for message in history:
if all(check(message) for check in checks):
to_delete.append(message)
can_bulk_delete = True
# Split into chunks of 100.
message_chunks = [to_delete[i:i + 100] for i in range(0, len(to_delete), 100)]
minimum_allowed = floor((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22
for chunk in message_chunks:
message_ids = []
for message in chunk:
if message.id < minimum_allowed:
msg = f"Cannot delete message id {message.id} older than {minimum_allowed}"
raise CuriousError(msg)
message_ids.append(message.id)
# First, try and bulk delete all the messages.
if can_bulk_delete:
try:
await self.channel._bot.http.delete_multiple_messages(self.channel.id,
message_ids)
except Forbidden:
# We might not have MANAGE_MESSAGES.
# Check if we should fallback on normal delete.
can_bulk_delete = False
if not fallback_from_bulk:
# Don't bother, actually.
raise
# This is an `if not` instead of an `else` because `can_bulk_delete` might've changed.
if not can_bulk_delete:
# Instead, just delete() the message.
for message in chunk:
await message.delete()
return len(to_delete)
[docs] async def get(self, message_id: int) -> 'dt_message.Message':
"""
Gets a single message from this channel.
.. versionchanged:: 0.7.0
Errors raised are now consistent across bots and userbots.
:param message_id: The message ID to retrieve.
:return: A new :class:`.Message` object.
:raises CuriousError: If the message could not be found.
"""
if self.channel.guild:
if not self.channel.permissions(self.channel.guild.me).read_message_history:
raise PermissionsError("read_message_history")
cached_message = self.channel._bot.state.find_message(message_id)
if cached_message is not None:
return cached_message
try:
data = await self.channel._bot.http.get_message(self.channel.id, message_id)
except HTTPException as e:
# transform into a CuriousError if it wasn't found
if e.error_code == ErrorCode.UNKNOWN_MESSAGE:
raise CuriousError("No message found for this ID") from e
raise
msg = self.channel._bot.state.make_message(data)
return msg
[docs]class Channel(Dataclass):
"""
Represents a channel object.
"""
def __init__(self, client, **kwargs) -> None:
super().__init__(kwargs.get("id"), client)
#: The name of this channel.
self.name = kwargs.get("name", None) # type: str
#: The topic of this channel.
self.topic = kwargs.get("topic", None) # type: str
#: The ID of the guild this is associated with.
self.guild_id = int(kwargs.get("guild_id", 0)) or None # type: int
parent_id = kwargs.get("parent_id")
if parent_id is not None:
parent_id = int(parent_id)
#: The parent ID of this channel.
self.parent_id = parent_id # type: int
#: The :class:`.ChannelType` of channel this channel is.
self.type = ChannelType(kwargs.get("type", 0)) # type: ChannelType
#: The :class:`.ChannelMessageWrapper` for this channel.
self._messages = None # type: ChannelMessageWrapper
#: If this channel is NSFW.
self.nsfw = kwargs.get("nsfw", False) # type: bool
#: If private, the mapping of :class:`.User` that are in this channel.
self._recipients = {} # type: _typing.Dict[int, dt_user.User]
if self.private:
for recipient in kwargs.get("recipients", []):
u = self._bot.state.make_user(recipient)
self._recipients[u.id] = u
if self.type == ChannelType.GROUP:
# append the current user
self._recipients[self._bot.user.id] = self._bot.user
#: The position of this channel.
self.position = kwargs.get("position", 0) # type: int
#: The last message ID of this channel.
#: Used for history.
self._last_message_id = None # type: int
_last_message_id = kwargs.get("last_message_id", 0)
if _last_message_id:
self._last_message_id = int(_last_message_id)
else:
self._last_message_id = None
# group channel stuff
#: The owner ID of the channel.
#: This is None for non-group channels.
self.owner_id = int(kwargs.get("owner_id", 0)) or None # type: int
#: The icon hash of the channel.
self.icon_hash = kwargs.get("icon", None) # type: str
#: The internal overwrites for this channel.
self._overwrites = {} # type: _typing.Dict[int, dt_permissions.Overwrite]
def __repr__(self) -> str:
return f"<Channel id={self.id} name={self.name} type={self.type.name} " \
f"guild_id={self.guild_id}>"
__str__ = __repr__
[docs] def _update_overwrites(self, overwrites: _typing.List[dict]):
"""
Updates the overwrites for this channel.
:param overwrites: A list of overwrite dicts.
"""
if not self.guild_id:
raise CuriousError("A channel without a guild cannot have overwrites")
self._overwrites = {}
for overwrite in overwrites:
id_ = int(overwrite["id"])
type_ = overwrite["type"]
if type_ == "member":
obb = self.guild._members.get(id_)
else:
obb = self.guild._members.get(id_)
self._overwrites[id] = dt_permissions.Overwrite(allow=overwrite["allow"],
deny=overwrite["deny"],
obb=obb, channel_id=self.id)
@property
def guild(self) -> '_typing.Union[dt_guild.Guild, None]':
"""
:return: The :class:`.Guild` associated with this Channel.
"""
try:
return self._bot.guilds[self.guild_id]
except KeyError:
return None
@property
def private(self) -> bool:
"""
:return: If this channel is a private channel (i.e has no guild.)
"""
return self.guild_id is None
@property
def recipients(self) -> '_typing.Mapping[int, dt_user.User]':
"""
:return: A mapping of int -> :class:`.User` for the recipients of this private chat.
"""
return MappingProxyType(self._recipients)
@property
def user(self) -> '_typing.Union[dt_user.User, None]':
"""
:return: If this channel is a private channel, the :class:`.User` of the other user.
"""
if self.type != ChannelType.PRIVATE:
return None
try:
return next(iter(self.recipients.values()))
except StopIteration:
return None
@property
def owner(self) -> '_typing.Union[dt_user.User, None]':
"""
:return: If this channel is a group channel, the owner of the channel.
"""
if not self.owner_id:
return None
try:
return self._bot.state._users[self.owner_id]
except KeyError:
return None
@property
def parent(self) -> '_typing.Union[Channel, None]':
"""
:return: If this channel has a parent, the parent category of this channel.
"""
try:
return self.guild.channels[self.parent_id]
except (KeyError, AttributeError):
return None
@property
def children(self) -> '_typing.List[Channel]':
"""
:return: A list of :class:`.Channel` children this channel has, if any.
"""
if not self.guild:
return []
channels = [channel
for channel in self.guild.channels.values()
if channel.parent_id == self.id]
return channels
[docs] def get_by_name(self, name: str) -> '_typing.Union[Channel, None]':
"""
Gets a channel by name in this channel's children.
:param name: The name of the channel to get.
:return: A :class:`.Channel` if the channel was find
"""
return next(filter(lambda channel: channel.name == name, self.children), None)
@property
def messages(self) -> 'ChannelMessageWrapper':
"""
:return: The :class:`.ChannelMessageWrapper` for this channel, if applicable.
"""
if not self.type.has_messages():
raise CuriousError("This channel does not have messages")
if self._messages is None:
self._messages = ChannelMessageWrapper(self)
return self._messages
@property
@deprecated(since="0.7.0", see_instead="Channel.messages", removal="0.9.0")
def history(self) -> HistoryIterator:
"""
:return: A :class:`.HistoryIterator` that can be used to iterate over the channel history.
"""
return self.messages.history
@property
def pins(self) -> '_typing.AsyncIterator[dt_message.Message]':
"""
:return: A :class:`.AsyncIteratorWrapper` that can be used to iterate over the pins.
"""
return AsyncIteratorWrapper(self.get_pins)
@property
def icon_url(self) -> _typing.Union[str, None]:
"""
:return: The icon URL for this channel if it is a group DM.
"""
return "https://cdn.discordapp.com/channel-icons/{}/{}.webp" \
.format(self.id, self.icon_hash)
@property
def voice_members(self) -> '_typing.List[dt_member.Member]':
"""
:return: A list of members that are in this voice channel.
"""
if self.type != ChannelType.VOICE:
raise CuriousError("No members for channels that aren't voice channels")
return list(
filter(lambda member: member.voice.channel == self, self.guild.members.values())
)
@property
def overwrites(self) -> '_typing.Mapping[int, dt_permissions.Overwrite]':
"""
:return: A mapping of target_id -> :class:`.Overwrite` for this channel.
"""
return MappingProxyType(self._overwrites)
[docs] def permissions(self, obb: '_typing.Union[dt_member.Member, dt_role.Role]') -> \
'dt_permissions.Overwrite':
"""
Gets the permission overwrites for the specified object.
"""
overwrite = self._overwrites.get(obb.id)
if not overwrite:
overwrite = dt_permissions.Overwrite(0, 0, obb)
overwrite.channel_id = self.id
return overwrite
@property
def me_permissions(self) -> 'dt_permissions.Overwrite':
"""
:return: The overwrite permissions for the current member.
"""
return self.permissions(self.guild.me)
def _copy(self):
obb = object.__new__(self.__class__)
obb.name = self.name
obb.type = self.type
obb.guild_id = self.guild_id
obb.nsfw = self.nsfw
obb._recipients = self._recipients
obb.icon_hash = self.icon_hash
obb.owner_id = self.owner_id
obb.topic = self.topic
obb.position = self.position
obb._bot = self._bot
obb.parent_id = self.parent_id
return obb
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.get_history", removal="0.9.0")
def get_history(self, before: int = None,
after: int = None,
limit: int = 100) -> HistoryIterator:
"""
Gets history for this channel.
This is *not* an async function - it returns a :class:`HistoryIterator` which can be async
iterated over to get message history.
.. code-block:: python3
async for message in channel.get_history(limit=1000):
print(message.content, "by", message.author.user.name)
:param limit: The maximum number of messages to get.
:param before: The snowflake ID to get messages before.
:param after: The snowflake ID to get messages after.
"""
return self.messages.get_history(before=before, after=after, limit=limit)
[docs] async def get_pins(self) -> '_typing.List[dt_message.Message]':
"""
Gets the pins for a channel.
:return: A list of :class:`.Message` objects.
"""
msg_data = await self._bot.http.get_pins(self.id)
messages = []
for message in msg_data:
messages.append(self._bot.state.make_message(message))
return messages
@property
def webhooks(self) -> 'AsyncIterator[dt_webhook.Webhook]':
"""
:return: A :class:`.AsyncIteratorWrapper` for the :class:`.Webhook` objects in this \
channel.
"""
return AsyncIteratorWrapper(self.get_webhooks)
[docs] async def get_webhooks(self) -> '_typing.List[dt_webhook.Webhook]':
"""
Gets the webhooks for this channel.
:return: A list of :class:`.Webhook` objects for the channel.
"""
webhooks = await self._bot.http.get_webhooks_for_channel(self.id)
obbs = []
for webhook in webhooks:
obbs.append(self._bot.state.make_webhook(webhook))
return obbs
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.get", removal="0.9.0")
async def get_message(self, message_id: int) -> 'dt_message.Message':
"""
Gets a single message from this channel.
:param message_id: The message ID to retrieve.
:return: A new :class:`.Message` object.
"""
return await self.messages.get(message_id)
[docs] async def create_webhook(self, *, name: str = None,
avatar: bytes = None) -> 'dt_webhook.Webhook':
"""
Create a webhook in this channel.
:param name: The name of the new webhook.
:param avatar: The bytes content of the new webhook.
:return: A :class:`.Webhook` that represents the webhook created.
"""
if not self.permissions(self.guild.me).manage_webhooks:
raise PermissionsError("manage_webhooks")
if avatar is not None:
avatar = base64ify(avatar)
data = await self._bot.http.create_webhook(self.id, name=name, avatar=avatar)
webook = self._bot.state.make_webhook(data)
return webook
[docs] async def edit_webhook(self, webhook: 'dt_webhook.Webhook', *,
name: str = None, avatar: bytes = None) -> 'dt_webhook.Webhook':
"""
Edits a webhook.
:param webhook: The :class:`.Webhook` to edit.
:param name: The new name for the webhook.
:param avatar: The new bytes for the avatar.
:return: The modified :class:`.Webhook`. object.
"""
if avatar is not None:
avatar = base64ify(avatar)
if webhook.token is not None:
# Edit it unconditionally.
await self._bot.http.edit_webhook_with_token(webhook.id, webhook.token,
name=name, avatar=avatar)
if not self.permissions(self.guild.me).manage_webhooks:
raise PermissionsError("manage_webhooks")
data = await self._bot.http.edit_webhook(webhook.id,
name=name, avatar=avatar)
webhook.default_name = data.get("name")
webhook._default_avatar = data.get("avatar")
webhook.user.username = data.get("name")
webhook.user.avatar_hash = data.get("avatar")
return webhook
[docs] async def delete_webhook(self, webhook: 'dt_webhook.Webhook') -> 'dt_webhook.Webhook':
"""
Deletes a webhook.
You must have MANAGE_WEBHOOKS to delete this webhook.
:param webhook: The :class:`.Webhook` to delete.
"""
if webhook.token is not None:
# Delete it unconditionally.
await self._bot.http.delete_webhook_with_token(webhook.id, webhook.token)
return webhook
if not self.permissions(self.guild.me).manage_webhooks:
raise PermissionsError("manage_webhooks")
await self._bot.http.delete_webhook(webhook.id)
return webhook
[docs] async def create_invite(self, **kwargs) -> 'dt_invite.Invite':
"""
Creates an invite in this channel.
:param max_age: The maximum age of the invite.
:param max_uses: The maximum uses of the invite.
:param temporary: Is this invite temporary?
:param unique: Is this invite unique?
"""
if not self.guild:
raise PermissionsError("create_instant_invite")
if not self.permissions(self.guild.me).create_instant_invite:
raise PermissionsError("create_instant_invite")
inv = await self._bot.http.create_invite(self.id, **kwargs)
invite = dt_invite.Invite(self._bot, **inv)
return invite
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.delete_messages",
removal="0.9.0")
async def delete_messages(self, messages: '_typing.List[dt_message.Message]') -> int:
"""
Deletes messages from a channel.
This is the low-level delete function - for the high-level function, see
:meth:`.Channel.purge()`.
Example for deleting all the last 100 messages:
.. code:: python
history = channel.get_history(limit=100)
messages = []
async for message in history:
messages.append(message)
await channel.delete_messages(messages)
:param messages: A list of :class:`.Message` objects to delete.
:return: The number of messages deleted.
"""
return await self.messages.bulk_delete(messages)
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.purge", removal="0.9.0")
async def purge(self, limit: int = 100, *,
author: 'dt_member.Member' = None,
content: str = None,
predicate: '_typing.Callable[[dt_message.Message], bool]' = None,
fallback_from_bulk: bool = False):
"""
Purges messages from a channel.
This will attempt to use ``bulk-delete`` if possible, but otherwise will use the normal
delete endpoint (which can get ratelimited severely!) if ``fallback_from_bulk`` is True.
Example for deleting all messages owned by the bot:
.. code-block:: python3
me = channel.guild.me
await channel.purge(limit=100, author=me)
Custom check functions can also be applied which specify any extra checks. They take one
argument (the Message object) and return a boolean (True or False) determining if the
message should be deleted.
For example, to delete all messages with the letter ``i`` in them:
.. code-block:: python3
await channel.purge(limit=100, predicate=lambda message: 'i' in message.content)
:param limit: The maximum amount of messages to delete. -1 for unbounded size.
:param author: Only delete messages made by this author.
:param content: Only delete messages that exactly match this content.
:param predicate: A callable that determines if a message should be deleted.
:param fallback_from_bulk: If this is True, messages will be regular deleted if they \
cannot be bulk deleted.
:return: The number of messages deleted.
"""
return await self.messages.purge(limit=limit, author=author, content=content,
predicate=predicate, fallback_from_bulk=fallback_from_bulk)
[docs] async def send_typing(self) -> None:
"""
Starts typing in the channel for 5 seconds.
"""
if not self.type.has_messages():
raise CuriousError("Cannot send messages to this channel")
if self.guild:
if not self.permissions(self.guild.me).send_messages:
raise PermissionsError("send_message")
await self._bot.http.send_typing(self.id)
@property
@asynccontextmanager
@safe_generator
async def typing(self) -> _typing.AsyncContextManager[None]:
"""
:return: A context manager that sends typing repeatedly.
Usage:
.. code-block:: python3
async with channel.typing:
res = await do_long_action()
await channel.messages.send("Long action:", res)
"""
running = multio.Event()
async def runner():
await self.send_typing()
while True:
try:
async with multio.timeout_after(5):
await running.wait()
except multio.asynclib.TaskTimeout:
await self.send_typing()
else:
return
async with multio.asynclib.task_manager() as tg:
await multio.asynclib.spawn(tg, runner)
try:
yield
finally:
await multio.asynclib.cancel_task_group(tg)
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.send", removal="0.10.0")
async def send(self, content: str = None, *,
tts: bool = False, embed: Embed = None) -> 'dt_message.Message':
"""
Sends a message to this channel.
This requires SEND_MESSAGES permission in the channel.
If the content is not a string, it will be automatically stringified.
.. code:: python
await channel.send("Hello, world!")
:param content: The content of the message to send.
:param tts: Should this message be text to speech?
:param embed: An embed object to send with this message.
:return: A new :class:`.Message` object.
"""
return await self.messages.send(content, tts=tts, embed=embed)
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.upload", removal="0.10.0")
async def send_file(self, file_content: bytes, filename: str,
*, message_content: _typing.Optional[str] = None) -> 'dt_message.Message':
"""
Uploads a message to this channel.
This requires SEND_MESSAGES and ATTACH_FILES permission in the channel.
.. code:: python
with open("/tmp/emilia_best_girl.jpg", 'rb') as f:
await channel.send_file(f.read(), "my_waifu.jpg")
:param file_content: The bytes-like file content to upload.
This **cannot** be a file-like object.
:param filename: The filename of the file.
:param message_content: Optional: Any extra content to be sent with the message.
:return: The new :class:`.Message` created.
"""
return await self.messages.upload(file_content, filename, message_content=message_content)
[docs] @deprecated(since="0.7.0", see_instead="Channel.messages.upload", removal="0.10.0")
async def upload_file(self, filename: str, *,
message_content: str = None) -> 'dt_message.Message':
"""
A higher level interface to ``send_file``.
This allows you to specify one of the following to upload:
- A filename (str)
- A file-like object
- A path-like object
This will open the file, read it in binary, and upload it to the channel.
:param filename: The file to send, in the formats specified above.
:param message_content: Any extra content to be sent with the message.
:return: The new :class:`.Message` created.
"""
return await self.messages.upload(fp=filename, filename=filename,
message_content=message_content)
[docs] async def change_overwrite(self, overwrite: 'dt_permissions.Overwrite'):
"""
Changes an overwrite for this channel.
This overwrite must be an instance of :class:`.Overwrite`.
:param overwrite: The specific overwrite to use.
If this is None, the overwrite will be deleted.
"""
if not self.guild:
raise PermissionsError("manage_roles")
if not self.permissions(self.guild.me).manage_roles:
raise PermissionsError("manage_roles")
target = overwrite.target
if isinstance(target, dt_member.Member):
type_ = "member"
else:
type_ = "role"
if overwrite is None:
# Delete the overwrite instead.
coro = self._bot.http.remove_overwrite(channel_id=self.id, target_id=target.id)
async def _listener(before, after):
if after.id != self.id:
return False
# probably right /shrug
return True
else:
coro = self._bot.http.edit_overwrite(self.id, target.id, type_,
allow=overwrite.allow.bitfield,
deny=overwrite.deny.bitfield)
async def _listener(before, after):
return after.id == self.id
async with self._bot.events.wait_for_manager("channel_update", _listener):
await coro
return self
[docs] async def edit(self, **kwargs) -> 'Channel':
"""
Edits this channel.
"""
if self.guild is None:
raise CuriousError("Can only edit guild channels")
if not self.permissions(self.guild.me).manage_channels:
raise PermissionsError("manage_channels")
if "type_" in kwargs:
kwargs["type"] = kwargs["type_"]
if "type" not in kwargs:
kwargs["type"] = self.type
if "parent" in kwargs:
kwargs["parent_id"] = kwargs["parent"].id
await self._bot.http.edit_channel(self.id, **kwargs)
return self
[docs] async def delete(self) -> 'Channel':
"""
Deletes this channel.
"""
if not self.permissions(self.guild.me).manage_channels:
raise PermissionsError("manaqe_channels")
await self._bot.http.delete_channel(self.id)
return self
[docs] async def connect(self):
"""
Connects to voice in this channel.
"""
if self.type != ChannelType.VOICE:
raise CuriousError("Cannot connect to a text channel")
return await self.guild.connect_to_voice(self)