Source code for curious.ext.paginator
# This file is part of curious.
#
# curious is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# curious is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with curious. If not, see <http://www.gnu.org/licenses/>.
"""
A reactions-based paginator.
"""
import typing
import multio
from curious.core.event import ListenerExit
from curious.dataclasses.channel import Channel
from curious.dataclasses.embed import Embed
from curious.dataclasses.member import Member
from curious.dataclasses.message import Message
from curious.dataclasses.reaction import Reaction
from curious.dataclasses.user import User
[docs]class ReactionsPaginator(object):
"""
A paginator for a message using reactions.
"""
BUTTON_BACKWARDS = "◀"
BUTTON_FORWARD = "▶"
BUTTON_STOP = "⏹"
def __init__(self, content: typing.Union[str, typing.List[str], typing.List[Embed]],
channel: Channel,
respond_to: typing.Union[Member, User], *,
break_at: int = 2000, title: str = None):
"""
:param content: The content to page through.
:param channel: The channel to send the content to.
:param respond_to: The member to respond
:param break_at: The number of characters to break the message up into.
:param title: The title to put above the embed.
"""
self._content = content
self.channel = channel
self.respond_to = respond_to
self.title = title
self.bot = self.channel._bot # hacky af
# chunk the message up
if isinstance(content, list):
self._message_chunks = content
else:
self._message_chunks = [self._content[i:i + break_at] for i in
range(0, len(self._content), break_at)]
#: The current page this paginator is on.
self.page = 0
#: The message object that is being edited.
self._message = None # type: Message
self._running = False
self._reaction_queue = multio.Queue()
[docs] @classmethod
async def paginate_response(cls, content: str,
responding_to: Message, *args, **kwargs) -> 'ReactionsPaginator':
"""
Paginates a response to a message.
:param content: The content to paginate.
:param responding_to: The message object that you are responding to.
"""
obb = cls(content, responding_to.channel, responding_to.author, *args, **kwargs)
await obb.paginate()
return obb
[docs] async def send_current_page(self) -> None:
"""
Sends the current page to the channel.
"""
chunk = self._message_chunks[self.page]
if isinstance(chunk, Embed):
embed = chunk
else:
embed = Embed(description=self._message_chunks[self.page])
embed.set_footer(text="Page {}/{}".format(self.page + 1, len(self._message_chunks)))
if self._message is None:
self._message = await self.channel.messages.send(
content=self.title,
embed=embed
)
else:
await self._message.edit(
new_content=self.title,
embed=embed
)
[docs] async def _add_initial_reactions(self):
"""
Adds the initial reactions to this message.
"""
await self._message.react(self.BUTTON_BACKWARDS)
await self._message.react(self.BUTTON_FORWARD)
await self._message.react(self.BUTTON_STOP)
[docs] async def paginate(self):
"""
Starts paginating this message.
This will continuously listen for reactions on this message until the STOP button is
pressed.
"""
self._running = True
async def consume_reaction(ctx, message: Message, author: Member, reaction: Reaction):
"""
Consumes reaction events and places them on a queue.
"""
if message.id != self._message.id:
return
if author.id != self.respond_to.id:
return
if self._running:
await self._reaction_queue.put(reaction)
else:
raise ListenerExit
# spawn the consumer task first
self.bot.events.add_temporary_listener("message_reaction_add", consume_reaction)
# send the stuff we want
await self.send_current_page()
await self._add_initial_reactions()
try:
while True:
async with multio.asynclib.timeout_after(120):
reaction = await self._reaction_queue.get()
if reaction.emoji == self.BUTTON_FORWARD:
if self.page < len(self._message_chunks) - 1:
self.page += 1
else:
self.page = 0
await self.send_current_page()
if reaction.emoji == self.BUTTON_BACKWARDS:
if self.page > 0:
self.page -= 1
else:
self.page = len(self._message_chunks) - 1
await self.send_current_page()
if reaction.emoji == self.BUTTON_STOP:
# remove all reactions were done here
break
await self._message.unreact(reaction.emoji, self.respond_to)
except multio.asynclib.TaskTimeout:
# eat timeouts but nothing else
pass
self._running = False
# we've broken out of the loop, so remove reactions and cancel the listener
await self._message.remove_all_reactions()
self.bot.events.remove_listener_early("message_reaction_add", consume_reaction)