impr: Further code review.

This commit is contained in:
Lemon4ksan
2025-01-25 14:09:35 +03:00
parent 3adcde37eb
commit 6a20ab11d1
9 changed files with 241 additions and 134 deletions

View File

@@ -11,9 +11,9 @@ from yandex_music import ClientAsync as YMClient
from yandex_music import Track, Album, Artist, Playlist
from MusicBot.database import BaseUsersDatabase, BaseGuildsDatabase
from MusicBot.cogs.utils.find import ListenView, generate_item_embed
from MusicBot.cogs.utils.misc import generate_playlists_embed, generate_likes_embed
from MusicBot.cogs.utils.views import MyPlaylists
from MusicBot.ui import ListenView, MyPlaylists, generate_playlists_embed
from MusicBot.cogs.utils.embeds import generate_item_embed
def setup(bot):
bot.add_cog(General(bot))
@@ -38,17 +38,17 @@ class General(Cog):
logging.info(f"Help command invoked by {ctx.user.id} for command '{command}'")
response_message = None
embed = discord.Embed(
title='Помощь',
color=0xfed42b
)
embed.set_author(name='YandexMusic')
embed.description = '__Использование__\n'
embed.set_author(name='Помощь')
if command == 'all':
embed.description = ("Данный бот позволяет вам слушать музыку из вашего аккаунта Yandex Music.\n"
"Зарегистрируйте свой токен с помощью /login. Его можно получить [здесь](https://github.com/MarshalX/yandex-music-api/discussions/513).\n"
"Для получения помощи для конкретной команды, введите /help <команда>.\n\n"
"**Для доп. помощи, зайдите на [сервер любителей Яндекс Музыки](https://discord.gg/gkmFDaPMeC).**")
embed.title = 'Помощь'
embed.add_field(
name='__Основные команды__',
@@ -64,7 +64,6 @@ class General(Cog):
"""
)
embed.set_author(name='YandexMusic')
embed.set_footer(text='©️ Bananchiki')
elif command == 'account':
embed.description += ("Ввести токен от Яндекс Музыки. Его можно получить [здесь](https://github.com/MarshalX/yandex-music-api/discussions/513).\n"
@@ -87,7 +86,7 @@ class General(Cog):
elif command == 'settings':
embed.description += ("Получить текущие настройки.\n```/settings show```\n"
"Разрешить или запретить воспроизведение Explicit треков и альбомов. Если автор или плейлист содержат Explicit треки, убираются кнопки для доступа к ним.\n```/settings explicit```\n"
"Разрешить или запретить создание меню проигрывателя, даже если в канале больше одного человека.\n```/settings menu```\n"
"Разрешить или запретить создание меню проигрывателя, когда в канале больше одного человека.\n```/settings menu```\n"
"Разрешить или запретить голосование.\n```/settings vote <тип голосования>```\n"
"`Примечание`: Только пользователи с разрешением управления каналом могут менять настройки.")
elif command == 'track':
@@ -99,7 +98,7 @@ class General(Cog):
elif command == 'voice':
embed.description += ("Присоединить бота в голосовой канал. Требует разрешения управления каналом.\n ```/voice join```\n"
"Заставить бота покинуть голосовой канал. Требует разрешения управления каналом.\n ```/voice leave```\n"
"Создать меню проигрывателя. Доступно только если вы единственный в голосовом канале.\n```/voice menu```")
"Создать меню проигрывателя. Доступность зависит от настроек сервера. По умолчанию работает только когда в канале один человек.\n```/voice menu```")
else:
response_message = '❌ Неизвестная команда.'
embed = None
@@ -154,7 +153,7 @@ class General(Cog):
real_tracks = await gather(*[track_short.fetch_track_async() for track_short in likes.tracks], return_exceptions=True)
tracks = [track for track in real_tracks if not isinstance(track, BaseException)] # Can't fetch user tracks
embed = generate_likes_embed(tracks)
embed = await generate_item_embed(tracks)
logging.info(f"Successfully fetched likes for user {ctx.user.id}")
await ctx.respond(embed=embed, view=ListenView(tracks))

View File

@@ -0,0 +1,7 @@
from .embeds import generate_item_embed
from .voice_extension import VoiceExtension
__all__ = [
"generate_item_embed",
"VoiceExtension",
]

View File

@@ -1,4 +1,5 @@
from typing import Any, cast
import logging
from typing import cast
from math import ceil
from os import getenv
@@ -9,7 +10,31 @@ from PIL import Image
from yandex_music import Track, Album, Artist, Playlist, Label
from discord import Embed
def generate_likes_embed(tracks: list[Track]) -> Embed:
async def generate_item_embed(item: Track | Album | Artist | Playlist | list[Track]) -> Embed:
"""Generate item embed.
Args:
item (yandex_music.Track | yandex_music.Album | yandex_music.Artist | yandex_music.Playlist): Item to be processed.
Returns:
discord.Embed: Item embed.
"""
logging.debug(f"Generating embed for type: '{type(item).__name__}'")
if isinstance(item, Track):
return await _generate_track_embed(item)
elif isinstance(item, Album):
return await _generate_album_embed(item)
elif isinstance(item, Artist):
return await _generate_artist_embed(item)
elif isinstance(item, Playlist):
return await _generate_playlist_embed(item)
elif isinstance(item, list):
return _generate_likes_embed(item)
else:
raise ValueError(f"Unknown item type: {type(item).__name__}")
def _generate_likes_embed(tracks: list[Track]) -> Embed:
track_count = len(tracks)
cover_url = "https://avatars.yandex.net/get-music-user-playlist/11418140/favorit-playlist-cover.bb48fdb9b9f4/300x300"
@@ -34,37 +59,7 @@ def generate_likes_embed(tracks: list[Track]) -> Embed:
return embed
def generate_playlists_embed(page: int, playlists: list[tuple[str, int]]) -> Embed:
count = 15 * page
length = len(playlists)
embed = Embed(
title=f"Всего плейлистов: {length}",
color=0xfed42b
)
embed.set_author(name="Ваши плейлисты")
embed.set_footer(text=f"Страница {page + 1} из {ceil(length / 10)}")
for playlist in playlists[count:count + 10]:
embed.add_field(name=playlist[0], value=f"{playlist[1]} треков", inline=False)
return embed
def generate_queue_embed(page: int, tracks_list: list[dict[str, Any]]) -> Embed:
count = 15 * page
length = len(tracks_list)
embed = Embed(
title=f"Всего: {length}",
color=0xfed42b,
)
embed.set_author(name="Очередь треков")
embed.set_footer(text=f"Страница {page + 1} из {ceil(length / 15)}")
for i, track in enumerate(tracks_list[count:count + 15], start=1 + count):
duration = track['duration_ms']
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
embed.add_field(name=f"{i} - {track['title']} - {duration_m}:{duration_s:02d}", value="", inline=False)
return embed
async def generate_track_embed(track: Track) -> Embed:
async def _generate_track_embed(track: Track) -> Embed:
title = cast(str, track.title)
avail = cast(bool, track.available)
artists = track.artists_name()
@@ -78,7 +73,7 @@ async def generate_track_embed(track: Track) -> Embed:
artist = track.artists[0]
cover_url = track.get_cover_url('400x400')
color = await get_average_color_from_url(cover_url)
color = await _get_average_color_from_url(cover_url)
if explicit:
explicit_eid = getenv('EXPLICIT_EID')
@@ -131,7 +126,7 @@ async def generate_track_embed(track: Track) -> Embed:
return embed
async def generate_album_embed(album: Album) -> Embed:
async def _generate_album_embed(album: Album) -> Embed:
title = cast(str, album.title)
track_count = album.track_count
artists = album.artists_name()
@@ -146,7 +141,7 @@ async def generate_album_embed(album: Album) -> Embed:
artist = album.artists[0]
cover_url = album.get_cover_url('400x400')
color = await get_average_color_from_url(cover_url)
color = await _get_average_color_from_url(cover_url)
if isinstance(album.labels[0], Label):
labels = [cast(Label, label).name for label in album.labels]
@@ -204,7 +199,7 @@ async def generate_album_embed(album: Album) -> Embed:
return embed
async def generate_artist_embed(artist: Artist) -> Embed:
async def _generate_artist_embed(artist: Artist) -> Embed:
name = cast(str, artist.name)
likes_count = artist.likes_count
avail = cast(bool, artist.available)
@@ -217,7 +212,7 @@ async def generate_artist_embed(artist: Artist) -> Embed:
cover_url = artist.get_op_image_url('400x400')
else:
cover_url = artist.cover.get_url(size='400x400')
color = await get_average_color_from_url(cover_url)
color = await _get_average_color_from_url(cover_url)
embed = Embed(
title=name,
@@ -249,7 +244,7 @@ async def generate_artist_embed(artist: Artist) -> Embed:
return embed
async def generate_playlist_embed(playlist: Playlist) -> Embed:
async def _generate_playlist_embed(playlist: Playlist) -> Embed:
title = cast(str, playlist.title)
track_count = playlist.track_count
avail = cast(bool, playlist.available)
@@ -272,7 +267,7 @@ async def generate_playlist_embed(playlist: Playlist) -> Embed:
continue
if cover_url:
color = await get_average_color_from_url(cover_url)
color = await _get_average_color_from_url(cover_url)
embed = Embed(
title=title,
@@ -303,7 +298,7 @@ async def generate_playlist_embed(playlist: Playlist) -> Embed:
return embed
async def get_average_color_from_url(url: str) -> int:
async def _get_average_color_from_url(url: str) -> int:
"""Get image from url and calculate its average color to use in embeds.
Args:

View File

@@ -1,187 +0,0 @@
import logging
from typing import Literal, cast
import discord
from yandex_music import Track, Album, Artist, Playlist
from discord.ui import View, Button, Item
from discord import ButtonStyle, Interaction, Embed
from MusicBot.cogs.utils.voice_extension import VoiceExtension
from MusicBot.cogs.utils.misc import generate_track_embed, generate_album_embed, generate_artist_embed, generate_playlist_embed
class PlayButton(Button, VoiceExtension):
def __init__(self, item: Track | Album | Artist | Playlist | list[Track], **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
self.item = item
async def callback(self, interaction: Interaction) -> None:
logging.debug(f"Callback triggered for type: '{type(self.item).__name__}'")
if not interaction.guild:
logging.warning("No guild found in context.")
return
if not await self.voice_check(interaction):
logging.debug("Voice check failed")
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
channel = cast(discord.VoiceChannel, interaction.channel)
member = cast(discord.Member, interaction.user)
action: Literal['add_track', 'add_album', 'add_artist', 'add_playlist']
if isinstance(self.item, Track):
tracks = [self.item]
action = 'add_track'
vote_message = f"{member.mention} хочет добавить трек **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Трек **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, Album):
album = await self.item.with_tracks_async()
if not album or not album.volumes:
logging.debug("Failed to fetch album tracks")
await interaction.respond("Не удалось получить треки альбома.", ephemeral=True)
return
tracks = [track for volume in album.volumes for track in volume]
action = 'add_album'
vote_message = f"{member.mention} хочет добавить альбом **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Альбом **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, Artist):
artist_tracks = await self.item.get_tracks_async()
if not artist_tracks:
logging.debug("Failed to fetch artist tracks")
await interaction.respond("Не удалось получить треки артиста.", ephemeral=True)
return
tracks = artist_tracks.tracks.copy()
action = 'add_artist'
vote_message = f"{member.mention} хочет добавить треки от **{self.item.name}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Песни артиста **{self.item.name}** были добавлены в очередь."
play_message = f"Сейчас играет: **{self.item.name}**!"
elif isinstance(self.item, Playlist):
short_tracks = await self.item.fetch_tracks_async()
if not short_tracks:
logging.debug("Failed to fetch playlist tracks")
await interaction.respond("Не удалось получить треки из плейлиста.", delete_after=15)
return
tracks = [cast(Track, short_track.track) for short_track in short_tracks]
action = 'add_playlist'
vote_message = f"{member.mention} хочет добавить плейлист **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Плейлист **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, list):
tracks = self.item.copy()
if not tracks:
logging.debug("Empty tracks list")
await interaction.respond("Не удалось получить треки.", delete_after=15)
return
action = 'add_playlist'
vote_message = f"{member.mention} хочет добавить плейлист **** в очередь.\n\n Голосуйте за добавление."
response_message = f"Плейлист **«Мне нравится»** был добавлен в очередь."
else:
raise ValueError(f"Unknown item type: '{type(self.item).__name__}'")
if guild.get(f'vote_{action}') and len(channel.members) > 2 and not member.guild_permissions.manage_channels:
logging.debug(f"Starting vote for '{action}'")
message = cast(discord.Interaction, await interaction.respond(vote_message, delete_after=30))
response = await message.original_response()
await response.add_reaction('')
await response.add_reaction('')
self.db.update_vote(
gid,
response.id,
{
'positive_votes': list(),
'negative_votes': list(),
'total_members': len(channel.members),
'action': action,
'vote_content': [track.to_dict() for track in tracks]
}
)
else:
logging.debug(f"Skipping vote for '{action}'")
if guild['current_track'] is not None:
self.db.modify_track(gid, tracks, 'next', 'extend')
else:
track = tracks.pop(0)
self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
response_message = f"Сейчас играет: **{tracks[0].title}**!"
current_player = None
if guild['current_player']:
current_player = await self.get_player_message(interaction, guild['current_player'])
if current_player and interaction.message:
logging.debug(f"Deleting interaction message '{interaction.message.id}': current player '{current_player.id}' found")
await interaction.message.delete()
else:
await interaction.respond(response_message, delete_after=15)
class ListenView(View):
def __init__(self, item: Track | Album | Artist | Playlist | list[Track], *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
logging.debug(f"Creating view for type: '{type(item).__name__}'")
if isinstance(item, Track):
link_app = f"yandexmusic://album/{item.albums[0].id}/track/{item.id}"
link_web = f"https://music.yandex.ru/album/{item.albums[0].id}/track/{item.id}"
elif isinstance(item, Album):
link_app = f"yandexmusic://album/{item.id}"
link_web = f"https://music.yandex.ru/album/{item.id}"
elif isinstance(item, Artist):
link_app = f"yandexmusic://artist/{item.id}"
link_web = f"https://music.yandex.ru/artist/{item.id}"
elif isinstance(item, Playlist):
link_app = f"yandexmusic://playlist/{item.playlist_uuid}"
link_web = f"https://music.yandex.ru/playlist/{item.playlist_uuid}"
elif isinstance(item, list): # Can't open other person's likes
self.add_item(PlayButton(item, label="Слушать в голосовом канале", style=ButtonStyle.gray))
return
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayButton = PlayButton(item, label="Слушать в голосовом канале", style=ButtonStyle.gray)
if item.available:
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
self.add_item(self.button3)
async def generate_item_embed(item: Track | Album | Artist | Playlist) -> Embed:
"""Generate item embed.
Args:
item (yandex_music.Track | yandex_music.Album | yandex_music.Artist | yandex_music.Playlist): Item to be processed.
Returns:
discord.Embed: Item embed.
"""
logging.debug(f"Generating embed for type: '{type(item).__name__}'")
if isinstance(item, Track):
return await generate_track_embed(item)
elif isinstance(item, Album):
return await generate_album_embed(item)
elif isinstance(item, Artist):
return await generate_artist_embed(item)
elif isinstance(item, Playlist):
return await generate_playlist_embed(item)
else:
raise ValueError(f"Unknown item type: {type(item).__name__}")

View File

@@ -1,126 +0,0 @@
import logging
from discord.ui import View, Button, Item
from discord import ButtonStyle, Interaction, ApplicationContext
from MusicBot.cogs.utils.voice_extension import VoiceExtension
class ToggleRepeatButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Repeat button callback...')
if not interaction.guild:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
self.db.update(gid, {'repeat': not guild['repeat']})
await interaction.edit(view=Player(interaction))
class ToggleShuffleButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Shuffle button callback...')
if not interaction.guild:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
self.db.update(gid, {'shuffle': not guild['shuffle']})
await interaction.edit(view=Player(interaction))
class PlayPauseButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Play/Pause button callback...')
if not await self.voice_check(interaction):
return
vc = await self.get_voice_client(interaction)
if not vc or not interaction.message:
return
embed = interaction.message.embeds[0]
if vc.is_paused():
vc.resume()
embed.remove_footer()
else:
vc.pause()
embed.set_footer(text='Приостановлено')
await interaction.edit(embed=embed)
class NextTrackButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Next track button callback...')
if not await self.voice_check(interaction):
return
title = await self.next_track(interaction)
if not title:
await interaction.respond(f"Нет треков в очереди.", delete_after=15, ephemeral=True)
class PrevTrackButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Previous track button callback...')
if not await self.voice_check(interaction):
return
title = await self.prev_track(interaction)
if not title:
await interaction.respond(f"Нет треков в истории.", delete_after=15, ephemeral=True)
class LikeButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.debug('Like button callback...')
if await self.voice_check(interaction):
vc = await self.get_voice_client(interaction)
if not vc or not vc.is_playing:
await interaction.respond("Нет воспроизводимого трека.", delete_after=15, ephemeral=True)
result = await self.like_track(interaction)
if not result:
await interaction.respond("❌ Операция не удалась.", delete_after=15, ephemeral=True)
elif result == 'TRACK REMOVED':
await interaction.respond("Трек был удалён из избранного.", delete_after=15, ephemeral=True)
else:
await interaction.respond(f"Трек **{result}** был добавлен в избранное.", delete_after=15, ephemeral=True)
class Player(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 3600, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
if not ctx.guild:
return
guild = self.db.get_guild(ctx.guild.id)
self.repeat_button = ToggleRepeatButton(style=ButtonStyle.success if guild['repeat'] else ButtonStyle.secondary, emoji='🔂', row=0)
self.shuffle_button = ToggleShuffleButton(style=ButtonStyle.success if guild['shuffle'] else ButtonStyle.secondary, emoji='🔀', row=0)
self.play_pause_button = PlayPauseButton(style=ButtonStyle.primary, emoji='', row=0)
self.next_button = NextTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.prev_button = PrevTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.queue_button = Button(style=ButtonStyle.primary, emoji='📋', row=1)
self.add_item(self.repeat_button)
self.add_item(self.prev_button)
self.add_item(self.play_pause_button)
self.add_item(self.next_button)
self.add_item(self.shuffle_button)

View File

@@ -1,105 +0,0 @@
from discord.ui import View, Button, Item
from discord import ButtonStyle, Interaction, ApplicationContext
from MusicBot.cogs.utils.voice_extension import VoiceExtension
from MusicBot.cogs.utils.misc import generate_playlists_embed, generate_queue_embed
class MPNextButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user:
return
user = self.users_db.get_user(interaction.user.id)
page = user['playlists_page'] + 1
self.users_db.update(interaction.user.id, {'playlists_page': page})
embed = generate_playlists_embed(page, user['playlists'])
await interaction.edit(embed=embed, view=MyPlaylists(interaction))
class MPPrevButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user:
return
user = self.users_db.get_user(interaction.user.id)
page = user['playlists_page'] - 1
self.users_db.update(interaction.user.id, {'playlists_page': page})
embed = generate_playlists_embed(page, user['playlists'])
await interaction.edit(embed=embed, view=MyPlaylists(interaction))
class MyPlaylists(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 3600, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
if not ctx.user:
return
user = self.users_db.get_user(ctx.user.id)
count = 10 * user['playlists_page']
next_button = MPNextButton(style=ButtonStyle.primary, emoji='▶️')
prev_button = MPPrevButton(style=ButtonStyle.primary, emoji='◀️')
if not user['playlists'][count + 10:]:
next_button.disabled = True
if not user['playlists'][:count]:
prev_button.disabled = True
self.add_item(prev_button)
self.add_item(next_button)
class QNextButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user or not interaction.guild:
return
user = self.users_db.get_user(interaction.user.id)
page = user['queue_page'] + 1
self.users_db.update(interaction.user.id, {'queue_page': page})
tracks = self.db.get_tracks_list(interaction.guild.id, 'next')
embed = generate_queue_embed(page, tracks)
await interaction.edit(embed=embed, view=QueueView(interaction))
class QPrevButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user or not interaction.guild:
return
user = self.users_db.get_user(interaction.user.id)
page = user['queue_page'] - 1
self.users_db.update(interaction.user.id, {'queue_page': page})
tracks = self.db.get_tracks_list(interaction.guild.id, 'next')
embed = generate_queue_embed(page, tracks)
await interaction.edit(embed=embed, view=QueueView(interaction))
class QueueView(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 3600, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
if not ctx.user or not ctx.guild:
return
tracks = self.db.get_tracks_list(ctx.guild.id, 'next')
user = self.users_db.get_user(ctx.user.id)
count = 15 * user['queue_page']
next_button = QNextButton(style=ButtonStyle.primary, emoji='▶️')
prev_button = QPrevButton(style=ButtonStyle.primary, emoji='◀️')
if not tracks[count + 15:]:
next_button.disabled = True
if not tracks[:count]:
prev_button.disabled = True
self.add_item(prev_button)
self.add_item(next_button)

View File

@@ -2,12 +2,12 @@ import asyncio
import logging
from typing import Any, Literal, cast
from yandex_music import Track, ClientAsync
from yandex_music import Track, TrackShort, ClientAsync
import discord
from discord import Interaction, ApplicationContext, RawReactionActionEvent
from MusicBot.cogs.utils.misc import generate_track_embed
from MusicBot.cogs.utils import generate_item_embed
from MusicBot.database import VoiceGuildsDatabase, BaseUsersDatabase
class VoiceExtension:
@@ -17,7 +17,7 @@ class VoiceExtension:
self.db = VoiceGuildsDatabase()
self.users_db = BaseUsersDatabase()
async def update_player_embed(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, player_mid: int) -> bool:
async def update_menu_embed(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, player_mid: int) -> bool:
"""Update current player message by its id. Return True if updated, False if not.
Args:
@@ -59,7 +59,7 @@ class VoiceExtension:
current_track,
client=ClientAsync(token) # type: ignore # Async client can be used here.
))
embed = await generate_track_embed(track)
embed = await generate_item_embed(track)
if isinstance(ctx, Interaction) and ctx.message and ctx.message.id == player_mid:
# If interaction from player buttons
@@ -230,7 +230,7 @@ class VoiceExtension:
player = guild['current_player']
if player is not None:
await self.update_player_embed(ctx, player)
await self.update_menu_embed(ctx, player)
return track.title
@@ -369,6 +369,35 @@ class VoiceExtension:
return None
async def get_likes(self, ctx: ApplicationContext | Interaction) -> list[TrackShort] | None:
"""Get liked tracks. Return list of tracks on success.
Return None if no token found.
Args:
ctx (ApplicationContext | Interaction): Context.
Returns:
list[Track] | None: List of tracks or None.
"""
if not ctx.guild or not ctx.user:
logging.warning("Guild or User not found in context inside 'like_track'")
return None
current_track = self.db.get_track(ctx.guild.id, 'current')
token = self.users_db.get_ym_token(ctx.user.id)
if not current_track or not token:
logging.debug("Current track or token not found")
return None
client = await ClientAsync(token).init()
likes = await client.users_likes_tracks()
if not likes:
logging.debug("No likes found")
return None
return likes.tracks
async def like_track(self, ctx: ApplicationContext | Interaction) -> str | Literal['TRACK REMOVED'] | None:
"""Like current track. Return track title on success.
@@ -389,9 +418,8 @@ class VoiceExtension:
return None
client = await ClientAsync(token).init()
likes = await client.users_likes_tracks()
likes = await self.get_likes(ctx)
if not likes:
logging.debug("No likes found")
return None
ym_track = cast(Track, Track.de_json(
@@ -399,7 +427,7 @@ class VoiceExtension:
client=client # type: ignore # Async client can be used here.
)
)
if ym_track.id not in [track.id for track in likes.tracks]:
if str(ym_track.id) not in [str(track.id) for track in likes]:
logging.debug("Track not found in likes. Adding...")
await ym_track.like_async()
return ym_track.title

View File

@@ -6,10 +6,8 @@ from discord.ext.commands import Cog
from yandex_music import Track, ClientAsync
from MusicBot.cogs.utils.voice_extension import VoiceExtension
from MusicBot.cogs.utils.player import Player
from MusicBot.cogs.utils.misc import generate_queue_embed, generate_track_embed
from MusicBot.cogs.utils.views import QueueView
from MusicBot.cogs.utils import VoiceExtension, generate_item_embed
from MusicBot.ui import MenuView, QueueView, generate_queue_embed
def setup(bot: discord.Bot):
bot.add_cog(Voice(bot))
@@ -31,6 +29,7 @@ class Voice(Cog, VoiceExtension):
gid = member.guild.id
guild = self.db.get_guild(gid)
discord_guild = await self.typed_bot.fetch_guild(gid)
current_player = self.db.get_current_player(gid)
channel = after.channel or before.channel
if not channel:
@@ -44,7 +43,6 @@ class Voice(Cog, VoiceExtension):
self.db.update(gid, {'previous_tracks': [], 'next_tracks': [], 'current_track': None, 'is_stopped': True})
vc.stop()
elif len(channel.members) > 2 and not guild['always_allow_menu']:
current_player = self.db.get_current_player(gid)
if current_player:
logging.info(f"Disabling current player for guild {gid} due to multiple members")
@@ -52,6 +50,7 @@ class Voice(Cog, VoiceExtension):
try:
message = await channel.fetch_message(current_player)
await message.delete()
await channel.send("Меню отключено из-за большого количества участников.", delete_after=15)
except (discord.NotFound, discord.Forbidden):
pass
@@ -201,7 +200,7 @@ class Voice(Cog, VoiceExtension):
return
if guild['current_track']:
embed = await generate_track_embed(
embed = await generate_item_embed(
Track.de_json(
guild['current_track'],
client=ClientAsync() # type: ignore # Async client can be used here.
@@ -215,10 +214,11 @@ class Voice(Cog, VoiceExtension):
if guild['current_player']:
logging.info(f"Deleteing old player menu {guild['current_player']} in guild {ctx.guild.id}")
message = await ctx.fetch_message(guild['current_player'])
await message.delete()
message = await self.get_player_message(ctx, guild['current_player'])
if message:
await message.delete()
interaction = cast(discord.Interaction, await ctx.respond(view=Player(ctx), embed=embed, delete_after=3600))
interaction = cast(discord.Interaction, await ctx.respond(view=await MenuView(ctx).init(), embed=embed, delete_after=3600))
response = await interaction.original_response()
self.db.update(ctx.guild.id, {'current_player': response.id})
@@ -306,7 +306,7 @@ class Voice(Cog, VoiceExtension):
player = self.db.get_current_player(ctx.guild.id)
if player:
await self.update_player_embed(ctx, player)
await self.update_menu_embed(ctx, player)
logging.info(f"Track paused in guild {ctx.guild.id}")
await ctx.respond("Воспроизведение приостановлено.", delete_after=15, ephemeral=True)
@@ -330,7 +330,7 @@ class Voice(Cog, VoiceExtension):
vc.resume()
player = self.db.get_current_player(ctx.guild.id)
if player:
await self.update_player_embed(ctx, player)
await self.update_menu_embed(ctx, player)
logging.info(f"Track resumed in guild {ctx.guild.id}")
await ctx.respond("Воспроизведение восстановлено.", delete_after=15, ephemeral=True)
else: