Merge pull request #3 from Lemon4ksan/dev

Обновление бота #3
This commit is contained in:
Bananchiki
2025-02-28 22:22:29 +03:00
committed by GitHub
8 changed files with 582 additions and 586 deletions

View File

@@ -18,7 +18,7 @@ def setup(bot):
bot.add_cog(General(bot))
async def get_search_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
if not ctx.interaction.user or not ctx.value or len(ctx.value) < 2:
if not ctx.interaction.user or not ctx.value or not (100 > len(ctx.value) > 2):
return []
uid = ctx.interaction.user.id
@@ -41,6 +41,10 @@ async def get_search_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
logging.debug(f"[GENERAL] Searching for '{ctx.value}' for user {uid}")
if content_type not in ('Трек', 'Альбом', 'Артист', 'Плейлист'):
logging.error(f"[GENERAL] Invalid content type '{content_type}' for user {uid}")
return []
if content_type == 'Трек' and search.tracks is not None:
res = [f"{item.title} {f"({item.version})" if item.version else ''} - {", ".join(item.artists_name())}" for item in search.tracks.results]
elif content_type == 'Альбом' and search.albums is not None:
@@ -50,13 +54,13 @@ async def get_search_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
elif content_type == 'Плейлист' and search.playlists is not None:
res = [f"{item.title}" for item in search.playlists.results]
else:
logging.warning(f"[GENERAL] Invalid content type '{content_type}' for user {uid}")
logging.info(f"[GENERAL] Failed to get content type '{content_type}' with name '{ctx.value}' for user {uid}")
return []
return res[:100]
async def get_user_playlists_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
if not ctx.interaction.user or not ctx.value or len(ctx.value) < 2:
if not ctx.interaction.user or not ctx.value or not (100 > len(ctx.value) > 2):
return []
uid = ctx.interaction.user.id
@@ -70,21 +74,25 @@ async def get_user_playlists_suggestions(ctx: discord.AutocompleteContext) -> li
except UnauthorizedError:
logging.info(f"[GENERAL] User {uid} provided invalid token")
return []
logging.debug(f"[GENERAL] Searching for '{ctx.value}' for user {uid}")
playlists_list = await client.users_playlists_list()
logging.debug(f"[GENERAL] Searching for '{ctx.value}' for user {uid}")
try:
playlists_list = await client.users_playlists_list()
except Exception as e:
logging.error(f"[GENERAL] Failed to get playlists for user {uid}: {e}")
return []
return [playlist.title for playlist in playlists_list if playlist.title and ctx.value in playlist.title][:100]
class General(Cog):
def __init__(self, bot: discord.Bot):
self.bot = bot
self.db = BaseGuildsDatabase()
self.users_db = users_db
account = discord.SlashCommandGroup("account", "Команды, связанные с аккаунтом.")
@discord.slash_command(description="Получить информацию о командах YandexMusic.")
@discord.option(
"command",
@@ -208,10 +216,11 @@ class General(Cog):
await ctx.respond("❌ Укажите токен через /account login.", delete_after=15, ephemeral=True)
return
client = await YMClient(token).init()
if not client.me or not client.me.account or not client.me.account.uid:
logging.warning(f"Failed to fetch user info for user {ctx.user.id}")
await ctx.respond('❌ Что-то пошло не так. Повторите попытку позже.', delete_after=15, ephemeral=True)
try:
client = await YMClient(token).init()
except UnauthorizedError:
logging.info(f"[GENERAL] Invalid token for user {ctx.user.id}")
await ctx.respond('❌ Неверный токен. Укажите новый через /account login.', delete_after=15, ephemeral=True)
return
likes = await client.users_likes_tracks()
@@ -256,7 +265,7 @@ class General(Cog):
client = await YMClient(token).init()
except UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.user.id} provided invalid token")
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
await ctx.respond('❌ Неверный токен. Укажите новый через /account login.', delete_after=15, ephemeral=True)
return
search = await client.search(content_type, type_='playlist')
@@ -287,7 +296,7 @@ class General(Cog):
autocomplete=discord.utils.basic_autocomplete(get_user_playlists_suggestions)
)
async def playlist(self, ctx: discord.ApplicationContext, name: str) -> None:
logging.info(f"[GENERAL] Playlists command invoked by user {ctx.user.id} in guild {ctx.guild_id}")
logging.info(f"[GENERAL] Playlist command invoked by user {ctx.user.id} in guild {ctx.guild_id}")
token = await self.users_db.get_ym_token(ctx.user.id)
if not token:
@@ -299,10 +308,15 @@ class General(Cog):
client = await YMClient(token).init()
except UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.user.id} provided invalid token")
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
await ctx.respond('❌ Неверный токен. Укажите новый через /account login.', delete_after=15, ephemeral=True)
return
playlists = await client.users_playlists_list()
try:
playlists = await client.users_playlists_list()
except UnauthorizedError:
logging.warning(f"[GENERAL] Unknown token error for user {ctx.user.id}")
await ctx.respond("❌ Произошла неизвестная ошибка при попытке получения плейлистов. Пожалуйста, сообщите об этом разработчику.", delete_after=15, ephemeral=True)
return
playlist = next((playlist for playlist in playlists if playlist.title == name), None)
if not playlist:

View File

@@ -1,5 +1,5 @@
import logging
from typing import cast
from typing import cast, Final
from math import ceil
from os import getenv
@@ -10,29 +10,35 @@ from PIL import Image
from yandex_music import Track, Album, Artist, Playlist, Label
from discord import Embed
explicit_eid: Final[str | None] = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment (EXPLICIT_EID).')
async def generate_item_embed(item: Track | Album | Artist | Playlist | list[Track], vibing: bool = False) -> Embed:
"""Generate item embed. list[Track] is used for likes. If vibing is True, add vibing image.
Args:
item (yandex_music.Track | yandex_music.Album | yandex_music.Artist | yandex_music.Playlist): Item to be processed.
item (Track | Album | Artist | Playlist | list[Track]): Item to be processed.
vibing (bool, optional): Add vibing image. Defaults to False.
Returns:
discord.Embed: Item embed.
"""
logging.debug(f"[EMBEDS] Generating embed for type: '{type(item).__name__}'")
if isinstance(item, Track):
embed = await _generate_track_embed(item)
elif isinstance(item, Album):
embed = await _generate_album_embed(item)
elif isinstance(item, Artist):
embed = await _generate_artist_embed(item)
elif isinstance(item, Playlist):
embed = await _generate_playlist_embed(item)
elif isinstance(item, list):
embed = _generate_likes_embed(item)
else:
raise ValueError(f"Unknown item type: {type(item).__name__}")
match item:
case Track():
embed = await _generate_track_embed(item)
case Album():
embed = await _generate_album_embed(item)
case Artist():
embed = await _generate_artist_embed(item)
case Playlist():
embed = await _generate_playlist_embed(item)
case list():
embed = _generate_likes_embed(item)
case _:
raise ValueError(f"Unknown item type: {type(item).__name__}")
if vibing:
embed.set_image(
@@ -41,13 +47,12 @@ async def generate_item_embed(item: Track | Album | Artist | Playlist | list[Tra
return embed
def _generate_likes_embed(tracks: list[Track]) -> Embed:
track_count = len(tracks)
cover_url = "https://avatars.yandex.net/get-music-user-playlist/11418140/favorit-playlist-cover.bb48fdb9b9f4/300x300"
embed = Embed(
title="Мне нравится",
description="Треки, которые вам понравились.",
color=0xce3a26,
color=0xce3a26
)
embed.set_thumbnail(url=cover_url)
@@ -56,203 +61,143 @@ def _generate_likes_embed(tracks: list[Track]) -> Embed:
if track.duration_ms:
duration += track.duration_ms
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
if duration_s == 60:
duration_m += 1
duration_s = 0
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if track_count is not None:
embed.add_field(name="Треки", value=str(track_count))
embed.add_field(name="Длительность", value=_format_duration(duration))
embed.add_field(name="Треки", value=str(len(tracks)))
return embed
async def _generate_track_embed(track: Track) -> Embed:
title = cast(str, track.title)
avail = cast(bool, track.available)
artists = track.artists_name()
title = track.title
albums = [cast(str, album.title) for album in track.albums]
lyrics = cast(bool, track.lyrics_available)
duration = cast(int, track.duration_ms)
explicit = track.explicit or track.content_warning
bg_video = track.background_video_uri
metadata = track.meta_data
year = track.albums[0].year
artist = track.artists[0]
year = track.albums[0].year if track.albums else None
artist = track.artists[0] if track.artists else None
cover_url = track.get_cover_url('400x400')
color = await _get_average_color_from_url(cover_url)
if track.cover_uri:
cover_url = track.get_cover_url('400x400')
color = await _get_average_color_from_url(cover_url)
else:
cover_url = None
color = 0x000
if explicit:
explicit_eid = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment (EXPLICIT_EID).')
if explicit and title:
title += ' <:explicit:' + explicit_eid + '>'
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
if duration_s == 60:
duration_m += 1
duration_s = 0
if artist:
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover and artist.op_image:
artist_cover_url = artist.get_op_image_url()
elif artist_cover:
artist_cover_url = artist_cover.get_url()
if not artist_cover and artist.op_image:
artist_cover_url = artist.get_op_image_url()
elif artist_cover:
artist_cover_url = artist_cover.get_url()
else:
artist_cover_url = None
else:
artist_url = None
artist_cover_url = None
embed = Embed(
title=title,
description=", ".join(albums),
color=color,
color=color
)
embed.set_thumbnail(url=cover_url)
embed.set_author(name=", ".join(artists), url=artist_url, icon_url=artist_cover_url)
embed.set_author(name=", ".join(track.artists_name()), url=artist_url, icon_url=artist_cover_url)
embed.add_field(name="Текст песни", value="Есть" if lyrics else "Нет")
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
embed.add_field(name="Текст песни", value="Есть" if track.lyrics_available else "Нет")
if isinstance(track.duration_ms, int):
embed.add_field(name="Длительность", value=_format_duration(track.duration_ms))
if year:
embed.add_field(name="Год выпуска", value=str(year))
if metadata:
if metadata.year:
embed.add_field(name="Год выхода", value=str(metadata.year))
if track.background_video_uri:
embed.add_field(name="Видеофон", value=f"[Ссылка]({track.background_video_uri})")
if metadata.number:
embed.add_field(name="Позиция", value=str(metadata.number))
if metadata.composer:
embed.add_field(name="Композитор", value=metadata.composer)
if metadata.version:
embed.add_field(name="Версия", value=metadata.version)
if bg_video:
embed.add_field(name="Видеофон", value=f"[Ссылка]({bg_video})")
if not avail:
if not (track.available or track.available_for_premium_users):
embed.set_footer(text=f"Трек в данный момент недоступен.")
return embed
async def _generate_album_embed(album: Album) -> Embed:
title = cast(str, album.title)
track_count = album.track_count
artists = album.artists_name()
avail = cast(bool, album.available)
description = album.short_description
year = album.year
version = album.version
bests = album.bests
duration = album.duration_ms
title = album.title
explicit = album.explicit or album.content_warning
likes_count = album.likes_count
artist = album.artists[0]
cover_url = album.get_cover_url('400x400')
color = await _get_average_color_from_url(cover_url)
if isinstance(album.labels[0], Label):
labels = [cast(Label, label).name for label in album.labels]
else:
labels = [cast(str, label) for label in album.labels]
if version:
title += f' *{version}*'
if album.version and title:
title += f' *{album.version}*'
if explicit:
explicit_eid = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment.')
if explicit and title:
title += ' <:explicit:' + explicit_eid + '>'
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover and artist.op_image:
artist_cover_url = artist.get_op_image_url()
artist_cover_url = artist.get_op_image_url('400x400')
elif artist_cover:
artist_cover_url = artist_cover.get_url()
artist_cover_url = artist_cover.get_url(size='400x400')
else:
artist_cover_url = None
embed = Embed(
title=title,
description=description,
color=color,
description=album.short_description,
color=await _get_average_color_from_url(cover_url)
)
embed.set_thumbnail(url=cover_url)
embed.set_author(name=", ".join(artists), url=artist_url, icon_url=artist_cover_url)
embed.set_author(name=", ".join(album.artists_name()), url=artist_url, icon_url=artist_cover_url)
if year:
embed.add_field(name="Год выпуска", value=str(year))
if album.year:
embed.add_field(name="Год выпуска", value=str(album.year))
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
if duration_s == 60:
duration_m += 1
duration_s = 0
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if isinstance(album.duration_ms, int):
embed.add_field(name="Длительность", value=_format_duration(album.duration_ms))
if track_count is not None:
if track_count > 1:
embed.add_field(name="Треки", value=str(track_count))
else:
embed.add_field(name="Треки", value="Сингл")
if album.track_count is not None:
embed.add_field(name="Треки", value=str(album.track_count) if album.track_count > 1 else "Сингл")
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if album.likes_count is not None:
embed.add_field(name="Лайки", value=str(album.likes_count))
if len(labels) > 1:
embed.add_field(name="Лейблы", value=", ".join(labels))
else:
embed.add_field(name="Лейбл", value=", ".join(labels))
embed.add_field(name="Лейблы" if len(labels) > 1 else "Лейбл", value=", ".join(labels))
if not avail:
if not (album.available or album.available_for_premium_users):
embed.set_footer(text=f"Альбом в данный момент недоступен.")
return embed
async def _generate_artist_embed(artist: Artist) -> Embed:
name = cast(str, artist.name)
likes_count = artist.likes_count
avail = cast(bool, artist.available)
counts = artist.counts
description = artist.description
ratings = artist.ratings
popular_tracks = artist.popular_tracks
if not artist.cover:
cover_url = artist.get_op_image_url('400x400')
else:
cover_url = artist.cover.get_url(size='400x400')
color = await _get_average_color_from_url(cover_url)
embed = Embed(
title=name,
description=description.text if description else None,
color=color,
title=artist.name,
description=artist.description.text if artist.description else None,
color=await _get_average_color_from_url(cover_url)
)
embed.set_thumbnail(url=cover_url)
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if artist.likes_count:
embed.add_field(name="Лайки", value=str(artist.likes_count))
# if ratings:
# embed.add_field(name="Слушателей за месяц", value=str(ratings.month)) # Wrong numbers?
# embed.add_field(name="Слушателей за месяц", value=str(ratings.month)) # Wrong numbers
if counts:
embed.add_field(name="Треки", value=str(counts.tracks))
if artist.counts:
embed.add_field(name="Треки", value=str(artist.counts.tracks))
embed.add_field(name="Альбомы", value=str(counts.direct_albums))
embed.add_field(name="Альбомы", value=str(artist.counts.direct_albums))
if artist.genres:
genres = [genre.capitalize() for genre in artist.genres]
@@ -261,23 +206,12 @@ async def _generate_artist_embed(artist: Artist) -> Embed:
else:
embed.add_field(name="Жанр", value=", ".join(genres))
if not avail:
if not artist.available or artist.reason:
embed.set_footer(text=f"Артист в данный момент недоступен.")
return embed
async def _generate_playlist_embed(playlist: Playlist) -> Embed:
title = cast(str, playlist.title)
track_count = playlist.track_count
avail = cast(bool, playlist.available)
description = playlist.description
year = playlist.created
modified = playlist.modified
duration = playlist.duration_ms
likes_count = playlist.likes_count
cover_url = None
if playlist.cover and playlist.cover.uri:
cover_url = f"https://{playlist.cover.uri.replace('%%', '400x400')}"
else:
@@ -287,6 +221,8 @@ async def _generate_playlist_embed(playlist: Playlist) -> Embed:
if track and track.albums and track.albums[0].cover_uri:
cover_url = f"https://{track.albums[0].cover_uri.replace('%%', '400x400')}"
break
else:
cover_url = None
if cover_url:
color = await _get_average_color_from_url(cover_url)
@@ -294,33 +230,28 @@ async def _generate_playlist_embed(playlist: Playlist) -> Embed:
color = 0x000
embed = Embed(
title=title,
description=description,
color=color,
title=playlist.title,
description=playlist.description,
color=color
)
embed.set_thumbnail(url=cover_url)
if year:
embed.add_field(name="Год создания", value=str(year).split('-')[0])
if playlist.created:
embed.add_field(name="Год создания", value=str(playlist.created).split('-')[0])
if modified:
embed.add_field(name="Изменён", value=str(modified).split('-')[0])
if playlist.modified:
embed.add_field(name="Изменён", value=str(playlist.modified).split('-')[0])
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
if duration_s == 60:
duration_m += 1
duration_s = 0
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if playlist.duration_ms:
embed.add_field(name="Длительность", value=_format_duration(playlist.duration_ms))
if track_count is not None:
embed.add_field(name="Треки", value=str(track_count))
if playlist.track_count is not None:
embed.add_field(name="Треки", value=str(playlist.track_count))
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if playlist.likes_count:
embed.add_field(name="Лайки", value=str(playlist.likes_count))
if not avail:
if not playlist.available:
embed.set_footer(text=f"Плейлист в данный момент недоступен.")
return embed
@@ -358,5 +289,13 @@ async def _get_average_color_from_url(url: str) -> int:
b = b_total // count
return (r << 16) + (g << 8) + b
except Exception:
except (aiohttp.ClientError, IOError, ValueError):
return 0x000
def _format_duration(duration_ms: int) -> str:
duration_m = duration_ms // 60000
duration_s = ceil(duration_ms / 1000) - duration_m * 60
if duration_s == 60:
duration_m += 1
duration_s = 0
return f"{duration_m}:{duration_s:02}"

View File

@@ -12,7 +12,7 @@ from discord.ui import View
from discord import Interaction, ApplicationContext, RawReactionActionEvent, VoiceChannel
from MusicBot.cogs.utils import generate_item_embed
from MusicBot.database import VoiceGuildsDatabase, BaseUsersDatabase, ExplicitGuild, ExplicitUser, MessageVotes
from MusicBot.database import VoiceGuildsDatabase, BaseUsersDatabase, ExplicitGuild, MessageVotes
menu_views: dict[int, View] = {} # Store menu views and delete them when needed to prevent memory leaks for after callbacks.
@@ -24,11 +24,11 @@ class VoiceExtension:
self.users_db = BaseUsersDatabase()
async def send_menu_message(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, *, disable: bool = False) -> bool:
"""Send menu message to the channel and delete old menu message if exists. Return True if sent.
"""Send menu message to the channel and delete old one if exists. Return True if sent.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
disable (bool, optional): Disable menu message. Defaults to False.
disable (bool, optional): Disable menu message buttons. Defaults to False.
Raises:
ValueError: If bot instance is not set and ctx is RawReactionActionEvent.
@@ -44,10 +44,11 @@ class VoiceExtension:
guild = await self.db.get_guild(ctx.guild_id, projection={'current_track': 1, 'current_menu': 1, 'vibing': 1})
if guild['current_track']:
if not (vc := await self.get_voice_client(ctx)):
return False
if not guild['current_track']:
embed = None
elif not (vc := await self.get_voice_client(ctx)):
return False
else:
track = cast(Track, Track.de_json(
guild['current_track'],
client=YMClient() # type: ignore
@@ -58,32 +59,29 @@ class VoiceExtension:
embed.set_footer(text='Приостановлено')
else:
embed.remove_footer()
else:
embed = None
if guild['current_menu']:
logging.info(f"[VC_EXT] Deleting old menu message {guild['current_menu']} in guild {ctx.guild_id}")
message = await self.get_menu_message(ctx, guild['current_menu'])
if message:
if (message := await self.get_menu_message(ctx, guild['current_menu'])):
await message.delete()
await self._update_menu_views_dict(ctx, disable=disable)
if isinstance(ctx, (ApplicationContext, Interaction)):
interaction = await ctx.respond(view=menu_views[ctx.guild_id], embed=embed)
else:
if not self.bot:
raise ValueError("Bot instance is not set.")
channel = cast(VoiceChannel, self.bot.get_channel(ctx.channel_id))
if not channel:
logging.warning(f"[VC_EXT] Channel {ctx.channel_id} not found in guild {ctx.guild_id}")
return False
elif not self.bot:
raise ValueError("Bot instance is not set.")
elif not (channel := self.bot.get_channel(ctx.channel_id)):
logging.warning(f"[VC_EXT] Channel {ctx.channel_id} not found in guild {ctx.guild_id}")
return False
elif isinstance(channel, discord.VoiceChannel):
interaction = await channel.send(
view=menu_views[ctx.guild_id],
embed=embed # type: ignore # Wrong typehints.
)
else:
logging.warning(f"[VC_EXT] Channel {ctx.channel_id} is not a voice channel in guild {ctx.guild_id}")
return False
response = await interaction.original_response() if isinstance(interaction, discord.Interaction) else interaction
await self.db.update(ctx.guild_id, {'current_menu': response.id})
@@ -113,12 +111,10 @@ class VoiceExtension:
menu = await ctx.fetch_message(menu_mid)
elif isinstance(ctx, Interaction):
menu = ctx.client.get_message(menu_mid)
elif isinstance(ctx, RawReactionActionEvent):
if not self.bot:
raise ValueError("Bot instance is not set.")
menu = self.bot.get_message(menu_mid)
elif not self.bot:
raise ValueError("Bot instance is not set.")
else:
raise ValueError(f"Invalid context type: '{type(ctx).__name__}'.")
menu = self.bot.get_message(menu_mid)
except discord.DiscordException as e:
logging.debug(f"[VC_EXT] Failed to get menu message: {e}")
await self.db.update(ctx.guild_id, {'current_menu': None})
@@ -167,6 +163,7 @@ class VoiceExtension:
guild = await self.db.get_guild(gid, projection={'vibing': 1, 'current_menu': 1, 'current_track': 1})
if not guild['current_menu']:
logging.debug("[VC_EXT] No current menu found")
return False
menu_message = await self.get_menu_message(ctx, guild['current_menu']) if not menu_message else menu_message
@@ -285,7 +282,7 @@ class VoiceExtension:
uid = viber_id if viber_id else ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not uid or not gid:
logging.warning("[VC_EXT] Guild ID or User ID not found in context inside 'vibe_update'")
logging.warning("[VC_EXT] Guild ID or User ID not found in context")
return False
user = await self.users_db.get_user(uid, projection={'ym_token': 1, 'vibe_settings': 1})
@@ -307,27 +304,23 @@ class VoiceExtension:
)
if not guild['vibing']:
feedback = await client.rotor_station_feedback_radio_started(
f"{type}:{id}",
f"desktop-user-{client.me.account.uid}", # type: ignore # That's made up, but it doesn't do much anyway.
)
try:
feedback = await client.rotor_station_feedback_radio_started(
f"{type}:{id}",
f"desktop-user-{client.me.account.uid}", # type: ignore # That's made up, but it doesn't do much anyway.
)
except yandex_music.exceptions.BadRequestError as e:
logging.info(f"[VIBE] Bad request error while starting radio: {e}")
return False
if not feedback:
logging.warning(f"[VIBE] Failed to start radio '{type}:{id}'")
return False
logging.debug(f"[VIBE] Successfully started radio '{type}:{id}'")
if guild['current_track']:
logging.debug("[VIBE] Getting next vibe tracks")
# Current track here is either the track used to start vibe or the last vibe track played.
# So we always set the current track as the last track in the queue.
tracks = await client.rotor_station_tracks(
f"{type}:{id}",
queue=guild['current_track']['id']
)
else:
tracks = await client.rotor_station_tracks(f"{type}:{id}")
tracks = await client.rotor_station_tracks(
f"{type}:{id}",
queue=guild['current_track']['id'] if guild['current_track'] else None # type: ignore
)
if not tracks:
logging.warning("[VIBE] Failed to get next vibe tracks")
@@ -430,91 +423,46 @@ class VoiceExtension:
async def play_track(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
track: Track,
track: Track | dict[str, Any],
*,
client: YMClient | None = None,
vc: discord.VoiceClient | None = None,
menu_message: discord.Message | None = None,
button_callback: bool = False,
retry: bool = False
) -> str | None:
"""Download ``track`` by its id and play it in the voice channel. Return track title on success.
Send vibe feedback for playing track if vibing. Should be called when voice requirements are met.
"""Play `track` in the voice channel. Avoids additional vibe feedback used in `next_track` and `previous_track`.
Forms ym_track and stops playback if needed. Returns track title on success.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
track (Track): Track to play.
vc (discord.VoiceClient | None): Voice client.
menu_message (discord.Message | None): Menu message. If None, fetches menu from channel using message id from database. Defaults to None.
button_callback (bool): Should be True if the function is being called from button callback. Defaults to False.
retry (bool): Whether the function is called again.
track (dict[str, Any]): Track to play.
vc (discord.VoiceClient | None, optional): Voice client. Defaults to None.
menu_message (discord.Message | None, optional): Menu message to update. Defaults to None.
button_callback (bool, optional): Should be True if the function is being called from button callback. Defaults to False.
Returns:
(str | None): Song title or None.
"""
gid = ctx.guild_id
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context")
return None
guild = await self.db.get_guild(gid, projection={'current_menu': 1, 'vibing': 1, 'current_track': 1})
vc = await self.get_voice_client(ctx) if not vc else vc
if not vc:
vc = await self.get_voice_client(ctx)
if not await self.stop_playing(ctx, vc=vc):
return None
try:
if not guild['current_track'] or track.id != guild['current_track']['id']:
await self._download_track(gid, track)
except yandex_music.exceptions.TimedOutError:
logging.warning(f"[VC_EXT] Timed out while downloading track '{track.title}'")
if isinstance(track, dict):
track = cast(Track, Track.de_json(
track,
client=await self.init_ym_client(ctx) if not client else client # type: ignore # Async client can be used here.
))
if not isinstance(ctx, RawReactionActionEvent) and ctx.channel:
channel = cast(discord.VoiceChannel, ctx.channel)
elif self.bot and isinstance(ctx, RawReactionActionEvent):
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
if not retry:
return await self.play_track(ctx, track, vc=vc, button_callback=button_callback, retry=True)
logging.error(f"[VC_EXT] Failed to download track '{track.title}'")
await channel.send(f"😔 Не удалось загрузить трек. Попробуйте сбросить меню.", delete_after=15)
return None
async with aiofiles.open(f'music/{gid}.mp3', "rb") as f:
track_bytes = io.BytesIO(await f.read())
song = discord.FFmpegPCMAudio(track_bytes, pipe=True, options='-vn -b:a 64k -filter:a "volume=0.15"')
await self.db.set_current_track(gid, track)
if menu_message or guild['current_menu']:
# Updating menu message before playing to prevent delay and avoid FFMPEG lags.
await self.update_menu_full(ctx, menu_message=menu_message, button_callback=button_callback)
if not guild['vibing']:
# Giving FFMPEG enough time to process the audio file
await asyncio.sleep(1)
loop = self._get_current_event_loop(ctx)
try:
vc.play(song, after=lambda exc: asyncio.run_coroutine_threadsafe(self.next_track(ctx, after=True), loop))
except discord.errors.ClientException as e:
logging.error(f"[VC_EXT] Error while playing track '{track.title}': {e}")
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond(f"Не удалось проиграть трек. Попробуйте сбросить меню.", delete_after=15, ephemeral=True)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send(f"Не удалось проиграть трек. Попробуйте сбросить меню.", delete_after=15)
return None
logging.info(f"[VC_EXT] Playing track '{track.title}'")
await self.db.update(gid, {'is_stopped': False})
if guild['vibing']:
await self._my_vibe_start_feedback(ctx, track, uid)
return track.title
return await self._play_track(
ctx,
track,
vc=vc,
menu_message=menu_message,
button_callback=button_callback
)
async def stop_playing(
self,
@@ -523,12 +471,13 @@ class VoiceExtension:
vc: discord.VoiceClient | None = None,
full: bool = False
) -> bool:
"""Stop playing music in the voice channel. Required to play next one. Returns True on success.
"""Stop playing music in the voice channel and send vibe feedback.
Required to play next track. Returns True on success.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
vc (discord.VoiceClient | None, optional): Voice client. Defaults to None.
full (bool, optional): Full check includes menu deletion and vibe feedback. Defaults to False.
full (bool, optional): Full check includes menu deletion. Defaults to False.
Returns:
bool: Whether the playback was stopped.
@@ -543,7 +492,6 @@ class VoiceExtension:
return False
guild = await self.db.get_guild(gid, projection={'current_menu': 1, 'current_track': 1, 'vibing': 1})
user = await self.users_db.get_user(uid, projection={'vibe_type': 1, 'vibe_id': 1, 'vibe_batch_id': 1, 'ym_token': 1})
vc = await self.get_voice_client(ctx) if not vc else vc
if not vc:
@@ -553,12 +501,13 @@ class VoiceExtension:
vc.stop()
if full:
if not await self._full_stop(ctx, guild, gid):
return False
if guild['vibing'] and guild['current_track']:
if not await self._my_vibe_stop_feedback(ctx, guild, user):
return False
await self.send_vibe_feedback(ctx, 'trackFinished', guild['current_track'])
if not guild['current_menu']:
return True
return await self._full_stop(ctx, guild['current_menu'], gid)
return True
@@ -568,6 +517,7 @@ class VoiceExtension:
vc: discord.VoiceClient | None = None,
*,
after: bool = False,
client: YMClient | None = None,
menu_message: discord.Message | None = None,
button_callback: bool = False
) -> str | None:
@@ -578,8 +528,9 @@ class VoiceExtension:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context
vc (discord.VoiceClient, optional): Voice client.
after (bool, optional): Whether the function is being called by the after callback. Defaults to False.
client (YMClient | None, optional): Yandex Music client. Defaults to None.
menu_message (discord.Message | None): Menu message. If None, fetches menu from channel using message id from database. Defaults to None.
button_interaction (bool, optional): Should be True if the function is being called from button callback. Defaults to False.
button_callback (bool, optional): Should be True if the function is being called from button callback. Defaults to False.
Returns:
(str | None): Track title or None.
@@ -595,18 +546,9 @@ class VoiceExtension:
guild = await self.db.get_guild(gid, projection={'shuffle': 1, 'repeat': 1, 'is_stopped': 1, 'current_menu': 1, 'vibing': 1, 'current_track': 1})
user = await self.users_db.get_user(uid)
client = await self.init_ym_client(ctx, user['ym_token'])
vc = await self.get_voice_client(ctx) if not vc else vc
if guild['is_stopped'] and after:
logging.debug("[VC_EXT] Playback is stopped, skipping after callback...")
return None
if not client:
return None
if not vc: # Silently return if bot got kicked
logging.debug("[VC_EXT] Voice client not found in 'next_track'")
logging.debug("[VC_EXT] Playback is stopped, skipping after callback.")
return None
if guild['current_track'] and guild['current_menu'] and not guild['repeat']:
@@ -617,14 +559,12 @@ class VoiceExtension:
await self.update_menu_view(ctx, menu_message=menu_message, disable=True)
if guild['vibing'] and guild['current_track']:
if not await self._my_vibe_feedback(ctx, guild, user, client, after=after):
if not await self.send_vibe_feedback(ctx, 'trackFinished' if after else 'skip', guild['current_track']):
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond("Что-то пошло не так. Попробуйте снова.", ephemeral=True)
await ctx.respond("Не удалось отправить отчёт об оконачнии Моей Волны.", ephemeral=True, delete_after=15)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send("Что-то пошло не так. Попробуйте снова.", delete_after=15)
return None
await channel.send("Не удалось отправить отчёт об оконачнии Моей Волны.", delete_after=15)
if guild['repeat'] and after:
logging.debug("[VC_EXT] Repeating current track")
@@ -635,7 +575,7 @@ class VoiceExtension:
else:
logging.debug("[VC_EXT] Getting next track from queue")
next_track = await self.db.get_track(gid, 'next')
if not next_track and guild['vibing']:
logging.debug("[VC_EXT] No next track found, generating new vibe")
if not user['vibe_type'] or not user['vibe_id']:
@@ -646,7 +586,7 @@ class VoiceExtension:
next_track = await self.db.get_track(gid, 'next')
if next_track:
title = await self._play_track(ctx, next_track, client=client, vc=vc, button_callback=button_callback)
title = await self.play_track(ctx, next_track, client=client, vc=vc, button_callback=button_callback)
if after and not guild['current_menu']:
if isinstance(ctx, discord.RawReactionActionEvent):
@@ -668,7 +608,7 @@ class VoiceExtension:
async def previous_track(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, button_callback: bool = False) -> str | None:
"""Switch to the previous track in the queue. Repeat current track if no previous one found.
Return track title on success.
Return track title on success. Should be called only if there's already track playing.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
@@ -700,7 +640,7 @@ class VoiceExtension:
track = None
if track:
return await self._play_track(ctx, track, button_callback=button_callback)
return await self.play_track(ctx, track, button_callback=button_callback)
return None
@@ -713,17 +653,15 @@ class VoiceExtension:
Returns:
(list[Track] | None): List of tracks or None.
"""
gid = ctx.guild_id
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
logging.info("[VC_EXT] Getting liked tracks")
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context inside 'play_track'")
if not ctx.guild_id:
logging.warning("Guild ID not found in context inside 'get_likes'")
return None
current_track = await self.db.get_track(gid, 'current')
client = await self.init_ym_client(ctx, await self.users_db.get_ym_token(uid))
client = await self.init_ym_client(ctx)
if not current_track:
if not await self.db.get_track(ctx.guild_id, 'current'):
logging.debug("[VC_EXT] Current track not found in 'get_likes'")
return None
@@ -732,7 +670,7 @@ class VoiceExtension:
likes = await client.users_likes_tracks()
if not likes:
logging.debug("[VC_EXT] No likes found")
logging.info("[VC_EXT] No likes found")
return None
return likes.tracks
@@ -797,7 +735,8 @@ class VoiceExtension:
Returns:
(YMClient | None): Client or None.
"""
logging.debug("[VC_EXT] Initializing Yandex Music client")
if not token:
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
token = await self.users_db.get_ym_token(uid) if uid else None
@@ -807,19 +746,22 @@ class VoiceExtension:
if not isinstance(ctx, discord.RawReactionActionEvent):
await ctx.respond("❌ Укажите токен через /account login.", delete_after=15, ephemeral=True)
return None
if not hasattr(self, '_ym_clients'):
self._ym_clients = {}
self._ym_clients: dict[str, YMClient] = {}
if token in self._ym_clients:
return self._ym_clients[token]
client = self._ym_clients[token]
try:
await client.account_status()
return client
except yandex_music.exceptions.UnauthorizedError:
del self._ym_clients[token]
return None
try:
client = await YMClient(token).init()
except yandex_music.exceptions.UnauthorizedError:
logging.debug("UnauthorizedError in 'init_ym_client'")
if not isinstance(ctx, discord.RawReactionActionEvent):
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
return None
self._ym_clients[token] = client
@@ -843,8 +785,8 @@ class VoiceExtension:
logging.warning("[VOICE] Guild not found")
return False
if not guild['current_menu']:
await self.send_menu_message(ctx)
if not guild['current_menu'] and not await self.send_menu_message(ctx):
await channel.send(content=f"Не удалось отправить меню! Попробуйте ещё раз.", delete_after=15)
if vote_data['action'] in ('next', 'previous'):
if not guild.get(f'{vote_data['action']}_tracks'):
@@ -916,29 +858,86 @@ class VoiceExtension:
await channel.send("❌ Произошла ошибка при обновлении станции.", delete_after=15)
return False
feedback = await self.update_vibe(ctx, _type, _id, viber_id=viber_id)
if not feedback:
if not await self.update_vibe(ctx, _type, _id, viber_id=viber_id):
await channel.send("❌ Операция не удалась. Возможно, у вес нет подписки на Яндекс Музыку.", delete_after=15)
return False
next_track = await self.db.get_track(ctx.guild_id, 'next')
if next_track:
await self._play_track(ctx, next_track)
await self.play_track(ctx, next_track)
else:
await channel.send("Не удалось воспроизвести трек.", delete_after=15)
return False
else:
logging.error(f"[VOICE] Unknown action '{vote_data['action']}' for message {ctx.message_id}")
return False
return True
async def send_vibe_feedback(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
feedback_type: Literal['radioStarted', 'trackStarted', 'trackFinished', 'skip'],
track: Track | dict[str, Any]
) -> bool:
"""Send vibe feedback to Yandex Music. Return True on success.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
feedback_type (str): Type of feedback. Can be 'radioStarted', 'trackStarted', 'trackFinished', 'skip'.
track (Track | dict[str, Any]): Track data.
Returns:
bool: True on success, False otherwise.
"""
logging.debug(f"[VC_EXT] Sending vibe feedback, type: {feedback_type}")
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not uid:
logging.warning("[VC_EXT] User id not found")
return False
user = await self.users_db.get_user(uid, projection={'ym_token': 1, 'vibe_batch_id': 1, 'vibe_type': 1, 'vibe_id': 1})
if not user['ym_token']:
logging.warning(f"[VC_EXT] No YM token for user {user['_id']}.")
return False
client = await self.init_ym_client(ctx, user['ym_token'])
if not client:
logging.info(f"[VC_EXT] Failed to init YM client for user {user['_id']}")
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond("❌ Что-то пошло не так. Попробуйте позже.", delete_after=15, ephemeral=True)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send("❌ Что-то пошло не так. Попробуйте позже.", delete_after=15)
return False
total_play_seconds = track['duration_ms'] // 1000 if feedback_type not in ('radioStarted', 'trackStarted') and track['duration_ms'] else None
try:
feedback = await client.rotor_station_feedback(
f'{user['vibe_type']}:{user['vibe_id']}',
feedback_type,
track_id=track['id'],
total_played_seconds=total_play_seconds, # type: ignore
batch_id=user['vibe_batch_id'] # type: ignore
)
except yandex_music.exceptions.BadRequestError as e:
logging.error(f"[VC_EXT] Failed to send vibe feedback, type: {feedback_type}, track: {track['title']} error: {e}")
return False
logging.info(f"[VC_EXT] Sent vibe feedback type '{feedback_type}' with result: {feedback}")
return feedback
async def _update_menu_views_dict(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
*,
disable: bool = False
) -> None:
"""Update menu views in `menu_views` dict. This prevents creating multiple menu views for the same guild.
"""Genereate a new menu view and update the `menu_views` dict. This prevents creating multiple menu views for the same guild.
Use guild id as a key to access menu view.
Args:
@@ -968,10 +967,10 @@ class VoiceExtension:
try:
await track.download_async(f'music/{gid}.mp3')
except yandex_music.exceptions.TimedOutError:
logging.warning(f"[VC_EXT] Timeout downloading {track.title}")
logging.warning(f"[VC_EXT] Timed out while downloading track '{track.title}'")
raise
async def _full_stop(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, guild: ExplicitGuild, gid: int) -> Literal[True]:
async def _full_stop(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, current_menu: int, gid: int) -> Literal[True]:
"""Stop all actions and delete menu. Return True on success.
Args:
@@ -986,195 +985,106 @@ class VoiceExtension:
if gid in menu_views:
menu_views[gid].stop()
del menu_views[gid]
if guild['current_menu']:
menu = await self.get_menu_message(ctx, guild['current_menu'])
if menu:
await menu.delete()
if (menu := await self.get_menu_message(ctx, current_menu)):
await menu.delete()
await self.db.update(gid, {
'current_menu': None, 'repeat': False, 'shuffle': False, 'previous_tracks': [], 'next_tracks': [], 'votes': {}, 'vibing': False
})
return True
async def _my_vibe_start_feedback(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, track: Track, uid: int):
"""Send vibe start feedback to Yandex Music. Return True on success.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
track (Track): Track.
uid (int): User ID.
Returns:
bool: True on success, False otherwise.
"""
user = await self.users_db.get_user(uid)
client = await self.init_ym_client(ctx, user['ym_token']) if not track.client else track.client
if not client:
logging.warning(f"[VOICE] No YM client for user {uid}.")
return False
feedback = await client.rotor_station_feedback_track_started(
f"{user['vibe_type']}:{user['vibe_id']}",
track.id,
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.debug(f"[VIBE] Track started feedback: {feedback}")
return True
async def _my_vibe_stop_feedback(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
guild: ExplicitGuild,
user: ExplicitUser
) -> bool:
"""Send vibe stop feedback to Yandex Music. Return True on success.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
guild (ExplicitGuild): Guild.
user (ExplicitUser): User.
Returns:
bool: True on success, False otherwise.
"""
logging.debug("[VC_EXT] Sending vibe stop feedback")
if not user['ym_token']:
logging.warning(f"[VOICE] No YM token for user {user['_id']}.")
return False
client = await self.init_ym_client(ctx, user['ym_token'])
if not client:
logging.info(f"[VOICE] Failed to init YM client for user {user['_id']}")
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond("❌ Что-то пошло не так. Попробуйте позже.", delete_after=15, ephemeral=True)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send("❌ Что-то пошло не так. Попробуйте позже.", delete_after=15)
return False
track = guild['current_track']
if not track:
logging.info(f"[VOICE] No current track in guild {guild['_id']}")
return False
res = await client.rotor_station_feedback_track_finished(
f"{user['vibe_type']}:{user['vibe_id']}",
track['id'],
track['duration_ms'] // 1000,
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.info(f"[VOICE] User {user['_id']} finished vibing with result: {res}")
return True
async def _my_vibe_feedback(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
guild: ExplicitGuild,
user: ExplicitUser,
client: YMClient,
*,
after: bool
) -> bool:
"""Send vibe feedback to Yandex Music. If the track was skipped, call `update_vibe` to get next tracks.
This is called when a user skips a track or when a track finishes and not when a user stops the player.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
guild (ExplicitGuild): Guild.
user (ExplicitUser): User.
client (YMClient): Yandex Music client.
after (bool): Whether the track finished or was skipped. If True, the track finished.
Returns:
bool: True on success, False otherwise.
"""
# TODO: Should be refactored to prevent duplication with `_my_vibe_stop_feedback` and `_my_vibe_start_feedback`
logging.debug(f"[VC_EXT] Sending vibe feedback, after: {after}")
if not user['vibe_type'] or not user['vibe_id']:
logging.warning("[VIBE] No vibe type or id found")
return False
if not guild['current_track']:
logging.warning("[VIBE] No current track found")
return False
if after:
feedback = await client.rotor_station_feedback_track_finished(
f'{user['vibe_type']}:{user['vibe_id']}',
guild['current_track']['id'],
guild['current_track']['duration_ms'] // 1000,
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.debug(f"[VIBE] Finished track feeedback: {feedback}")
else:
feedback = await client.rotor_station_feedback_skip(
f'{user['vibe_type']}:{user['vibe_id']}',
guild['current_track']['id'],
guild['current_track']['duration_ms'] // 1000,
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
if not feedback:
logging.warning("[VIBE] Failed to send vibe feedback")
return False
logging.debug(f"[VIBE] Skipped track feeedback: {feedback}")
feedback = await self.update_vibe(
ctx,
user['vibe_type'],
user['vibe_id']
)
return feedback
async def _play_track(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
track: dict[str, Any],
track: Track,
*,
client: YMClient | None = None,
vc: discord.VoiceClient | None = None,
menu_message: discord.Message | None = None,
button_callback: bool = False,
retry: bool = False
) -> str | None:
"""Play `track` in the voice channel. Avoids additional vibe checks used in `next_track` and `previous_track`.
"""Download ``track`` by its id and play it in the voice channel. Return track title on success.
Send vibe feedback for playing track if vibing. Should be called when voice requirements are met.
Args:
ctx (ApplicationContext | Interaction | RawReactionActionEvent): Context.
track (dict[str, Any]): Track to play.
vc (discord.VoiceClient | None, optional): Voice client. Defaults to None.
menu_message (discord.Message | None, optional): Menu message to update. Defaults to None.
button_callback (bool, optional): Should be True if the function is being called from button callback. Defaults to False.
track (Track): Track to play.
vc (discord.VoiceClient | None): Voice client.
menu_message (discord.Message | None): Menu message. If None, fetches menu from channel using message id from database. Defaults to None.
button_callback (bool): Should be True if the function is being called from button callback. Defaults to False.
retry (bool): Whether the function is called again.
Returns:
str | None: Song title or None.
(str | None): Song title or None.
"""
# TODO: This should be refactored to avoid code duplication with `next_track` and `previous_track`.
client = await self.init_ym_client(ctx) if not client else client
gid = ctx.guild_id
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not client:
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context")
return None
if not vc:
vc = await self.get_voice_client(ctx)
guild = await self.db.get_guild(gid, projection={'current_menu': 1, 'vibing': 1, 'current_track': 1})
if not await self.stop_playing(ctx, vc=vc):
if not (vc := await self.get_voice_client(ctx) if not vc else vc):
return None
ym_track = cast(Track, Track.de_json(
track,
client=client # type: ignore # Async client can be used here.
))
return await self.play_track(
ctx,
ym_track,
vc=vc,
menu_message=menu_message,
button_callback=button_callback
)
try:
if not guild['current_track'] or track.id != guild['current_track']['id']:
await self._download_track(gid, track)
except yandex_music.exceptions.TimedOutError:
if not isinstance(ctx, RawReactionActionEvent) and ctx.channel:
channel = cast(discord.VoiceChannel, ctx.channel)
elif not retry:
return await self._play_track(ctx, track, vc=vc, menu_message=menu_message, button_callback=button_callback, retry=True)
elif self.bot and isinstance(ctx, RawReactionActionEvent):
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
logging.error(f"[VC_EXT] Failed to download track '{track.title}'")
await channel.send(f"😔 Не удалось загрузить трек. Попробуйте сбросить меню.", delete_after=15)
return None
async with aiofiles.open(f'music/{gid}.mp3', "rb") as f:
track_bytes = io.BytesIO(await f.read())
song = discord.FFmpegPCMAudio(track_bytes, pipe=True, options='-vn -b:a 64k -filter:a "volume=0.15"')
await self.db.set_current_track(gid, track)
if menu_message or guild['current_menu']:
# Updating menu message before playing to prevent delay and avoid FFMPEG lags.
await self.update_menu_full(ctx, menu_message=menu_message, button_callback=button_callback)
if not guild['vibing']:
# Giving FFMPEG enough time to process the audio file
await asyncio.sleep(1)
loop = self._get_current_event_loop(ctx)
try:
vc.play(song, after=lambda exc: asyncio.run_coroutine_threadsafe(self.next_track(ctx, after=True), loop))
except discord.errors.ClientException as e:
logging.error(f"[VC_EXT] Error while playing track '{track.title}': {e}")
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond(f"Не удалось проиграть трек. Попробуйте сбросить меню.", delete_after=15, ephemeral=True)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send(f"Не удалось проиграть трек. Попробуйте сбросить меню.", delete_after=15)
return None
except yandex_music.exceptions.InvalidBitrateError:
logging.error(f"[VC_EXT] Invalid bitrate while playing track '{track.title}'")
if not isinstance(ctx, RawReactionActionEvent):
await ctx.respond(f"У трека отсутствует необходимый битрейт. Его проигрывание невозможно.", delete_after=15, ephemeral=True)
elif self.bot:
channel = cast(discord.VoiceChannel, self.bot.get_channel(ctx.channel_id))
await channel.send(f"У трека отсутствует необходимый битрейт. Его проигрывание невозможно.", delete_after=15)
return None
logging.info(f"[VC_EXT] Playing track '{track.title}'")
await self.db.update(gid, {'is_stopped': False})
if guild['vibing']:
await self.send_vibe_feedback(ctx, 'trackStarted', track)
return track.title
def _get_current_event_loop(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent) -> asyncio.AbstractEventLoop:
"""Get the current event loop. If the context is a RawReactionActionEvent, get the loop from the self.bot instance.

View File

@@ -204,8 +204,8 @@ class Voice(Cog, VoiceExtension):
@voice.command(name="menu", description="Создать или обновить меню проигрывателя.")
async def menu(self, ctx: discord.ApplicationContext) -> None:
logging.info(f"[VOICE] Menu command invoked by user {ctx.author.id} in guild {ctx.guild.id}")
if await self.voice_check(ctx):
await self.send_menu_message(ctx)
if await self.voice_check(ctx) and not await self.send_menu_message(ctx):
await ctx.respond("Не удалось создать меню.", ephemeral=True)
@voice.command(name="join", description="Подключиться к голосовому каналу, в котором вы сейчас находитесь.")
async def join(self, ctx: discord.ApplicationContext) -> None:
@@ -213,17 +213,17 @@ class Voice(Cog, VoiceExtension):
member = cast(discord.Member, ctx.author)
guild = await self.db.get_guild(ctx.guild.id, projection={'allow_change_connect': 1})
vc = await self.get_voice_client(ctx)
await ctx.defer(ephemeral=True)
if not member.guild_permissions.manage_channels and not guild['allow_change_connect']:
response_message = "У вас нет прав для выполнения этой команды."
elif vc and vc.is_connected():
response_message = "❌ Бот уже находится в голосовом канале. Выключите его с помощью команды /voice leave."
elif isinstance(ctx.channel, discord.VoiceChannel):
try:
await ctx.channel.connect()
except TimeoutError:
response_message = "Не удалось подключиться к голосовому каналу."
except discord.ClientException:
response_message = "❌ Бот уже находится в голосовом канале. Выключите его с помощью команды /voice leave."
else:
response_message = "✅ Подключение успешно!"
else:
@@ -302,6 +302,10 @@ class Voice(Cog, VoiceExtension):
await self.users_db.update(ctx.user.id, {'queue_page': 0})
tracks = await self.db.get_tracks_list(ctx.guild.id, 'next')
if len(tracks) == 0:
await ctx.respond("❌ Очередь пуста.", ephemeral=True)
return
embed = generate_queue_embed(0, tracks)
await ctx.respond(embed=embed, view=await QueueView(ctx).init(), ephemeral=True)
@@ -340,6 +344,7 @@ class Voice(Cog, VoiceExtension):
)
return
await ctx.defer(ephemeral=True)
res = await self.stop_playing(ctx, full=True)
if res:
await ctx.respond("✅ Воспроизведение остановлено.", delete_after=15, ephemeral=True)
@@ -435,18 +440,16 @@ class Voice(Cog, VoiceExtension):
}
)
return
feedback = await self.update_vibe(ctx, _type, _id)
if not feedback:
if not await self.update_vibe(ctx, _type, _id):
await ctx.respond("❌ Операция не удалась. Возможно, у вес нет подписки на Яндекс Музыку.", delete_after=15, ephemeral=True)
return
if guild['current_menu']:
await ctx.respond("✅ Моя Волна включена.", delete_after=15, ephemeral=True)
else:
await self.send_menu_message(ctx, disable=True)
elif not await self.send_menu_message(ctx, disable=True):
await ctx.respond("Не удалось отправить меню. Попробуйте позже.", delete_after=15, ephemeral=True)
next_track = await self.db.get_track(ctx.guild_id, 'next')
if next_track:
await self._play_track(ctx, next_track)
await self.play_track(ctx, next_track)

View File

@@ -1,11 +1,12 @@
import os
import logging
from aiohttp import ClientSession
import discord
from discord.ext.commands import Bot
from discord.ext import tasks
intents = discord.Intents.default()
intents.message_content = True
bot = Bot(intents=intents)
cogs_list = [
@@ -19,6 +20,22 @@ async def on_ready():
logging.info("Bot's ready!")
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name="/voice vibe"))
@tasks.loop(seconds=3600)
async def update_server_count():
# Don't update server count in debug mode
if os.getenv('DEBUG') == 'True':
return
async with ClientSession() as session:
if token := os.getenv('PROMO_TOKEN_1'):
res = await session.post(
'https://api.server-discord.com/v2/bots/1325795708019806250/stats',
headers={'Authorization': token},
data={'servers': len(bot.guilds), 'shards': bot.shard_count or 1}
)
if not res.ok:
logging.error(f'Failed to update server count 1: {res.status} {await res.text()}')
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()

View File

@@ -19,15 +19,18 @@ class PlayButton(Button, VoiceExtension):
logging.debug(f"[FIND] Callback triggered for type: '{type(self.item).__name__}'")
if not interaction.guild:
logging.warning("[FIND] No guild found in PlayButton callback")
logging.info("[FIND] No guild found in PlayButton callback")
await interaction.respond("❌ Эта команда доступна только на серверах.", ephemeral=True, delete_after=15)
return
if not await self.voice_check(interaction):
logging.debug("[FIND] Voice check failed in PlayButton callback")
return
gid = interaction.guild.id
guild = await self.db.get_guild(gid, projection={'current_track': 1, 'current_menu': 1, 'vote_add': 1})
guild = await self.db.get_guild(interaction.guild.id, projection={'current_track': 1, 'current_menu': 1, 'vote_add': 1, 'vibing': 1})
if guild['vibing']:
await interaction.respond("❌ Нельзя добавлять треки в очередь, пока запущена волна.", ephemeral=True, delete_after=15)
return
channel = cast(discord.VoiceChannel, interaction.channel)
member = cast(discord.Member, interaction.user)
@@ -41,7 +44,7 @@ class PlayButton(Button, VoiceExtension):
album = await self.item.with_tracks_async()
if not album or not album.volumes:
logging.debug("[FIND] Failed to fetch album tracks in PlayButton callback")
await interaction.respond("Не удалось получить треки альбома.", ephemeral=True)
await interaction.respond("Не удалось получить треки альбома.", ephemeral=True, delete_after=15)
return
tracks = [track for volume in album.volumes for track in volume]
@@ -53,7 +56,7 @@ class PlayButton(Button, VoiceExtension):
artist_tracks = await self.item.get_tracks_async()
if not artist_tracks:
logging.debug("[FIND] Failed to fetch artist tracks in PlayButton callback")
await interaction.respond("Не удалось получить треки артиста.", ephemeral=True)
await interaction.respond("Не удалось получить треки артиста.", ephemeral=True, delete_after=15)
return
tracks = artist_tracks.tracks.copy()
@@ -65,7 +68,7 @@ class PlayButton(Button, VoiceExtension):
short_tracks = await self.item.fetch_tracks_async()
if not short_tracks:
logging.debug("[FIND] Failed to fetch playlist tracks in PlayButton callback")
await interaction.respond("Не удалось получить треки из плейлиста.", delete_after=15)
await interaction.respond("Не удалось получить треки из плейлиста.", ephemeral=True, delete_after=15)
return
tracks = [cast(Track, short_track.track) for short_track in short_tracks]
@@ -77,7 +80,7 @@ class PlayButton(Button, VoiceExtension):
tracks = self.item.copy()
if not tracks:
logging.debug("[FIND] Empty tracks list in PlayButton callback")
await interaction.respond("Не удалось получить треки.", delete_after=15)
await interaction.respond("Не удалось получить треки.", ephemeral=True, delete_after=15)
return
action = 'add_playlist'
@@ -97,7 +100,7 @@ class PlayButton(Button, VoiceExtension):
await response.add_reaction('')
await self.db.update_vote(
gid,
interaction.guild.id,
response.id,
{
'positive_votes': list(),
@@ -109,21 +112,20 @@ class PlayButton(Button, VoiceExtension):
)
return
logging.debug(f"[FIND] Skipping vote for '{action}'")
if guild['current_menu']:
await interaction.respond(response_message, delete_after=15)
else:
await self.send_menu_message(interaction, disable=True)
elif not await self.send_menu_message(interaction, disable=True):
await interaction.respond('Не удалось отправить сообщение.', ephemeral=True, delete_after=15)
if guild['current_track'] is not None:
if guild['current_track']:
logging.debug(f"[FIND] Adding tracks to queue")
await self.db.modify_track(gid, tracks, 'next', 'extend')
await self.db.modify_track(interaction.guild.id, tracks, 'next', 'extend')
else:
logging.debug(f"[FIND] Playing track")
track = tracks.pop(0)
await self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
await self.db.modify_track(interaction.guild.id, tracks, 'next', 'extend')
if not await self.play_track(interaction, track):
await interaction.respond('Не удалось воспроизвести трек.', ephemeral=True, delete_after=15)
if interaction.message:
await interaction.message.delete()
@@ -138,12 +140,17 @@ class MyVibeButton(Button, VoiceExtension):
async def callback(self, interaction: discord.Interaction):
logging.debug(f"[VIBE] Button callback for '{type(self.item).__name__}'")
if not await self.voice_check(interaction):
return
gid = interaction.guild_id
if not gid:
logging.warning(f"[VIBE] Guild ID is None in button callback")
if not interaction.guild_id or not interaction.user:
logging.warning(f"[VIBE] Guild ID or user is None in button callback")
return
guild = await self.db.get_guild(interaction.guild_id, projection={'current_menu': 1, 'vibing': 1})
if guild['vibing']:
await interaction.respond('❌ Волна уже запущена. Остановите её с помощью команды /voice stop.', ephemeral=True, delete_after=15)
return
track_type_map = {
@@ -153,7 +160,7 @@ class MyVibeButton(Button, VoiceExtension):
if isinstance(self.item, Playlist):
if not self.item.owner:
logging.warning(f"[VIBE] Playlist owner is None")
await interaction.respond("Не удалось получить информацию о плейлисте.", ephemeral=True)
await interaction.respond("Не удалось получить информацию о плейлисте. Отсутствует владелец.", ephemeral=True, delete_after=15)
return
_id = self.item.owner.login + '_' + str(self.item.kind)
@@ -162,16 +169,50 @@ class MyVibeButton(Button, VoiceExtension):
else:
_id = 'onyourwave'
await self.send_menu_message(interaction, disable=True)
await self.update_vibe(
interaction,
track_type_map[type(self.item)],
_id
)
member = cast(discord.Member, interaction.user)
channel = cast(discord.VoiceChannel, interaction.channel)
if len(channel.members) > 2 and not member.guild_permissions.manage_channels:
logging.info(f"Starting vote for starting vibe in guild {interaction.guild_id}")
next_track = await self.db.get_track(gid, 'next')
if next_track:
await self._play_track(interaction, next_track)
match self.item:
case Track():
response_message = f"{member.mention} хочет запустить волну по треку **{self.item['title']}**.\n\n Выполнить действие?"
case Album():
response_message = f"{member.mention} хочет запустить волну по альбому **{self.item['title']}**.\n\n Выполнить действие?"
case Artist():
response_message = f"{member.mention} хочет запустить волну по исполнителю **{self.item['name']}**.\n\n Выполнить действие?"
case Playlist():
response_message = f"{member.mention} хочет запустить волну по плейлисту **{self.item['title']}**.\n\n Выполнить действие?"
case list():
response_message = f"{member.mention} хочет запустить станцию **Моя Волна**.\n\n Выполнить действие?"
message = cast(discord.Interaction, await interaction.respond(response_message))
response = await message.original_response()
await response.add_reaction('')
await response.add_reaction('')
await self.db.update_vote(
interaction.guild_id,
response.id,
{
'positive_votes': list(),
'negative_votes': list(),
'total_members': len(channel.members),
'action': 'vibe_station',
'vote_content': [track_type_map[type(self.item)], _id, interaction.user.id]
}
)
return
if not guild['current_menu'] and not await self.send_menu_message(interaction, disable=True):
await interaction.respond('Не удалось отправить сообщение.', ephemeral=True, delete_after=15)
await self.update_vibe(interaction, track_type_map[type(self.item)], _id)
if (next_track := await self.db.get_track(interaction.guild_id, 'next')):
await self.play_track(interaction, next_track)
class ListenView(View):
def __init__(self, item: Track | Album | Artist | Playlist | list[Track], *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
@@ -192,6 +233,7 @@ class ListenView(View):
link_web = f"https://music.yandex.ru/playlist/{item.playlist_uuid}"
elif isinstance(item, list): # Can't open other person's likes
self.add_item(PlayButton(item, label="Слушать в голосовом канале", style=ButtonStyle.gray))
self.add_item(MyVibeButton(item, label="Моя Волна", style=ButtonStyle.gray, emoji="🌊", row=1))
return
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app, row=0)
@@ -208,6 +250,6 @@ class ListenView(View):
async def on_timeout(self) -> None:
try:
return await super().on_timeout()
except discord.NotFound:
except discord.HTTPException:
pass
self.stop()

View File

@@ -2,7 +2,10 @@ import logging
from typing import Self, cast
from discord.ui import View, Button, Item, Select
from discord import VoiceChannel, ButtonStyle, Interaction, ApplicationContext, RawReactionActionEvent, Embed, ComponentType, SelectOption, Member
from discord import (
Interaction, ApplicationContext, RawReactionActionEvent,
VoiceChannel, ButtonStyle, Embed, ComponentType, SelectOption, Member, HTTPException
)
import yandex_music.exceptions
from yandex_music import TrackLyrics, Playlist, ClientAsync as YMClient
@@ -275,11 +278,45 @@ class MyVibeButton(Button, VoiceExtension):
if not await self.voice_check(interaction):
return
if not interaction.guild_id:
logging.warning('[MENU] No guild id in button callback')
if not interaction.guild_id or not interaction.user:
logging.warning('[MENU] No guild id or user in button callback')
return
member = cast(Member, interaction.user)
channel = cast(VoiceChannel, interaction.channel)
track = await self.db.get_track(interaction.guild_id, 'current')
if len(channel.members) > 2 and not member.guild_permissions.manage_channels:
logging.info(f"Starting vote for starting vibe in guild {interaction.guild_id}")
if track:
response_message = f"{member.mention} хочет запустить волну по треку **{track['title']}**.\n\n Выполнить действие?"
_type = 'track'
_id = track['id']
else:
response_message = f"{member.mention} хочет запустить станцию **Моя Волна**.\n\n Выполнить действие?"
_type = 'user'
_id = 'onyourwave'
message = cast(Interaction, await interaction.respond(response_message))
response = await message.original_response()
await response.add_reaction('')
await response.add_reaction('')
await self.db.update_vote(
interaction.guild_id,
response.id,
{
'positive_votes': list(),
'negative_votes': list(),
'total_members': len(channel.members),
'action': 'vibe_station',
'vote_content': [_type, _id, interaction.user.id]
}
)
return
track = await self.db.get_track(interaction.guild_id, 'current')
if track:
logging.info(f"[MENU] Playing vibe for track '{track["id"]}'")
res = await self.update_vibe(
@@ -296,14 +333,12 @@ class MyVibeButton(Button, VoiceExtension):
)
if not res:
logging.warning('[MENU] Failed to start the vibe')
await interaction.respond('Не удалось запустить "Мою Волну". Попробуйте позже.', ephemeral=True)
logging.info('[MENU] Failed to start the vibe')
await interaction.respond('Не удалось запустить "Мою Волну". Возможно, у вас нет подписки на Яндекс Музыку.', ephemeral=True)
next_track = await self.db.get_track(interaction.guild_id, 'next')
if next_track:
# Need to avoid additional feedback.
# TODO: Make it more elegant
await self._play_track(interaction, next_track, button_callback=True)
await self.play_track(interaction, next_track, button_callback=True)
class MyVibeSelect(Select, VoiceExtension):
def __init__(self, *args, **kwargs):
@@ -350,7 +385,7 @@ class MyVibeSelect(Select, VoiceExtension):
await interaction.edit(view=view)
class MyVibeSettingsView(View, VoiceExtension):
def __init__(self, interaction: Interaction, *items: Item, timeout: float | None = None, disable_on_timeout: bool = True):
def __init__(self, interaction: Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
self.interaction = interaction
@@ -412,6 +447,13 @@ class MyVibeSettingsView(View, VoiceExtension):
self.add_item(select)
return self
async def on_timeout(self) -> None:
try:
return await super().on_timeout()
except HTTPException:
pass
self.stop()
class MyVibeSettingsButton(Button, VoiceExtension):
def __init__(self, **kwargs):
@@ -455,18 +497,32 @@ class AddToPlaylistSelect(Select, VoiceExtension):
if not current_track:
return
res = await self.ym_client.users_playlists_insert_track(
kind=f"{playlist.kind}",
track_id=current_track['id'],
album_id=current_track['albums'][0]['id'],
revision=playlist.revision or 1,
user_id=f"{playlist.uid}"
)
if res:
await interaction.respond('✅ Добавлено в плейлист', delete_after=15, ephemeral=True)
tracks = [track.id for track in playlist.tracks]
track_in_playlist = current_track['id'] in tracks
if track_in_playlist:
index = tracks.index(current_track['id'])
res = await self.ym_client.users_playlists_delete_track(
kind=f"{playlist.kind}",
from_=index,
to=index + 1,
revision=playlist.revision or 1
)
else:
res = await self.ym_client.users_playlists_insert_track(
kind=f"{playlist.kind}",
track_id=current_track['id'],
album_id=current_track['albums'][0]['id'],
revision=playlist.revision or 1
)
if not res:
await interaction.respond('❌ Что-то пошло не так. Попробуйте позже.', delete_after=15, ephemeral=True)
elif track_in_playlist:
await interaction.respond('🗑 Трек был удалён из плейлиста.', delete_after=15, ephemeral=True)
else:
await interaction.respond('📩 Трек был добавлен в плейлист.', delete_after=15, ephemeral=True)
class AddToPlaylistButton(Button, VoiceExtension):
def __init__(self, **kwargs):
@@ -477,6 +533,11 @@ class AddToPlaylistButton(Button, VoiceExtension):
if not await self.voice_check(interaction) or not interaction.guild_id:
return
current_track = await self.db.get_track(interaction.guild_id, 'current')
if not current_track:
await interaction.respond('❌ Нет воспроизводимого трека.', delete_after=15, ephemeral=True)
return
client = await self.init_ym_client(interaction)
if not client:
await interaction.respond('❌ Что-то пошло не так. Попробуйте позже.', delete_after=15, ephemeral=True)
@@ -527,12 +588,12 @@ class MenuView(View, VoiceExtension):
self.add_to_playlist_button = AddToPlaylistButton(style=ButtonStyle.secondary, emoji='📁', row=1)
self.vibe_button = MyVibeButton(style=ButtonStyle.secondary, emoji='🌊', row=1)
self.vibe_settings_button = MyVibeSettingsButton(style=ButtonStyle.success, emoji='🛠', row=1)
async def init(self, *, disable: bool = False) -> Self:
if not self.ctx.guild_id:
return self
self.guild = await self.db.get_guild(self.ctx.guild_id)
self.guild = await self.db.get_guild(self.ctx.guild_id, projection={'repeat': 1, 'shuffle': 1, 'current_track': 1, 'vibing': 1})
if self.guild['repeat']:
self.repeat_button.style = ButtonStyle.success
@@ -583,10 +644,12 @@ class MenuView(View, VoiceExtension):
if self.guild['current_menu']:
await self.stop_playing(self.ctx)
await self.db.update(self.ctx.guild_id, {'current_menu': None, 'previous_tracks': [], 'vibing': False})
message = await self.get_menu_message(self.ctx, self.guild['current_menu'])
if message:
await message.delete()
logging.debug('[MENU] Successfully deleted menu message')
else:
logging.debug('[MENU] No menu message found')
self.stop()

View File

@@ -2,7 +2,7 @@ from math import ceil
from typing import Self, Any
from discord.ui import View, Button, Item
from discord import ApplicationContext, ButtonStyle, Interaction, Embed
from discord import ApplicationContext, ButtonStyle, Interaction, Embed, HTTPException
from MusicBot.cogs.utils.voice_extension import VoiceExtension
@@ -27,10 +27,11 @@ class QueueNextButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user or not interaction.guild:
return
user = await self.users_db.get_user(interaction.user.id)
page = user['queue_page'] + 1
await self.users_db.update(interaction.user.id, {'queue_page': page})
@@ -42,10 +43,11 @@ class QueuePrevButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user or not interaction.guild:
return
user = await self.users_db.get_user(interaction.user.id)
page = user['queue_page'] - 1
await self.users_db.update(interaction.user.id, {'queue_page': page})
@@ -54,30 +56,36 @@ class QueuePrevButton(Button, VoiceExtension):
await interaction.edit(embed=embed, view=await QueueView(interaction).init())
class QueueView(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = False):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
self.ctx = ctx
self.next_button = QueueNextButton(style=ButtonStyle.primary, emoji='▶️')
self.prev_button = QueuePrevButton(style=ButtonStyle.primary, emoji='◀️')
async def init(self) -> Self:
if not self.ctx.user or not self.ctx.guild:
return self
tracks = await self.db.get_tracks_list(self.ctx.guild.id, 'next')
user = await self.users_db.get_user(self.ctx.user.id)
count = 15 * user['queue_page']
if not tracks[count + 15:]:
self.next_button.disabled = True
if not tracks[:count]:
self.prev_button.disabled = True
self.add_item(self.prev_button)
self.add_item(self.next_button)
return self
async def on_timeout(self) -> None:
try:
await super().on_timeout()
except HTTPException:
pass
self.stop()