impr: Code organization and bug fixes.

This commit is contained in:
Lemon4ksan
2025-02-06 23:05:02 +03:00
parent f7ad7556d1
commit 3cb971b473
8 changed files with 786 additions and 581 deletions

View File

@@ -5,44 +5,34 @@ from discord.ui import View, Button, Item, Select
from discord import VoiceChannel, ButtonStyle, Interaction, ApplicationContext, RawReactionActionEvent, Embed, ComponentType, SelectOption
import yandex_music.exceptions
from yandex_music import Track, Playlist, ClientAsync as YMClient
from yandex_music import TrackLyrics, Playlist, ClientAsync as YMClient
from MusicBot.cogs.utils.voice_extension import VoiceExtension, menu_views
class ToggleRepeatButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
class ToggleButton(Button, VoiceExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Repeat button callback...')
if not await self.voice_check(interaction) or not interaction.guild:
async def callback(self, interaction: Interaction):
callback_type = interaction.custom_id
if callback_type not in ('repeat', 'shuffle'):
raise ValueError(f"Invalid callback type: '{callback_type}'")
logging.info(f'[MENU] {callback_type.capitalize()} button callback')
if not (gid := interaction.guild_id):
logging.warning('[MENU] Failed to get guild ID.')
await interaction.respond("❌ Что-то пошло не так. Попробуйте снова.", delete_after=15, ephemeral=True)
return
gid = interaction.guild.id
guild = await self.db.get_guild(gid)
await self.db.update(gid, {'repeat': not guild['repeat']})
if gid in menu_views:
menu_views[gid].stop()
menu_views[gid] = await MenuView(interaction).init()
await interaction.edit(view=menu_views[gid])
class ToggleShuffleButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Shuffle button callback...')
if not await self.voice_check(interaction) or not interaction.guild:
if not await self.voice_check(interaction, check_vibe_privilage=True):
return
gid = interaction.guild.id
guild = await self.db.get_guild(gid)
await self.db.update(gid, {'shuffle': not guild['shuffle']})
if gid in menu_views:
menu_views[gid].stop()
menu_views[gid] = await MenuView(interaction).init()
await interaction.edit(view=menu_views[gid])
guild = await self.db.get_guild(gid)
await self.db.update(gid, {callback_type: not guild[callback_type]})
if not await self.update_menu_view(interaction, guild, button_callback=True):
await interaction.respond("❌ Что-то пошло не так. Попробуйте снова.", delete_after=15, ephemeral=True)
class PlayPauseButton(Button, VoiceExtension):
def __init__(self, **kwargs):
@@ -51,11 +41,10 @@ class PlayPauseButton(Button, VoiceExtension):
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Play/Pause button callback...')
if not await self.voice_check(interaction):
if not await self.voice_check(interaction, check_vibe_privilage=True):
return
vc = await self.get_voice_client(interaction)
if not vc or not interaction.message:
if not (vc := await self.get_voice_client(interaction)) or not interaction.message:
return
try:
@@ -73,117 +62,95 @@ class PlayPauseButton(Button, VoiceExtension):
await interaction.edit(embed=embed)
class NextTrackButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
class SwitchTrackButton(Button, VoiceExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Next track button callback...')
if not await self.voice_check(interaction):
async def callback(self, interaction: Interaction):
callback_type = interaction.custom_id
if callback_type not in ('next', 'previous'):
raise ValueError(f"Invalid callback type: '{callback_type}'")
logging.info(f'[MENU] {callback_type.capitalize()} track button callback')
if not await self.voice_check(interaction, check_vibe_privilage=True):
return
title = await self.next_track(interaction, button_callback=True)
if callback_type == 'next':
title = await self.next_track(interaction, button_callback=True)
else:
title = await self.prev_track(interaction, button_callback=True)
if not title:
await interaction.respond(f"❌ Нет треков в очереди.", delete_after=15, ephemeral=True)
class PrevTrackButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
class ReactionButton(Button, VoiceExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Previous track button callback...')
if not await self.voice_check(interaction):
return
title = await self.prev_track(interaction, button_callback=True)
if not title:
await interaction.respond(f"Нет треков в истории.", delete_after=15, ephemeral=True)
async def callback(self, interaction: Interaction):
callback_type = interaction.custom_id
if callback_type not in ('like', 'dislike'):
raise ValueError(f"Invalid callback type: '{callback_type}'")
class LikeButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
logging.info(f'[MENU] {callback_type.capitalize()} button callback')
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Like button callback...')
if not await self.voice_check(interaction, check_vibe_privilage=False):
return
if not interaction.guild:
return
gid = interaction.guild.id
if not (vc := await self.get_voice_client(interaction)) or not vc.is_playing:
await interaction.respond("❌ Нет воспроизводимого трека.", delete_after=15, ephemeral=True)
await self.like_track(interaction)
if gid in menu_views:
menu_views[gid].stop()
menu_views[gid] = await MenuView(interaction).init()
await interaction.edit(view=menu_views[gid])
class DislikeButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Dislike button callback...')
if not await self.voice_check(interaction):
if not await self.voice_check(interaction) or not (gid := interaction.guild_id):
return
if not (vc := await self.get_voice_client(interaction)) or not vc.is_playing:
await interaction.respond("❌ Нет воспроизводимого трека.", delete_after=15, ephemeral=True)
res = await self.dislike_track(interaction)
if res:
logging.debug("[VC_EXT] Disliked track")
res = await self.react_track(interaction, callback_type)
if callback_type == 'like' and res[0]:
await self._update_menu_views_dict(interaction)
await interaction.edit(view=menu_views[gid])
elif callback_type == 'dislike' and res[0]:
await self.next_track(interaction, vc=vc, button_callback=True)
else:
logging.debug("[VC_EXT] Failed to dislike track")
await interaction.respond("Не удалось поставить дизлайк. Попробуйте позже.")
logging.debug(f"[VC_EXT] Failed to {callback_type} track")
await interaction.respond("Операция не удалась. Попробуйте позже.")
class LyricsButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
super().__init__(**kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[MENU] Lyrics button callback...')
if not await self.voice_check(interaction, check_vibe_privilage=False) or not interaction.guild_id or not interaction.user:
if not await self.voice_check(interaction) or not interaction.guild_id or not interaction.user:
return
ym_token = await self.users_db.get_ym_token(interaction.user.id)
current_track = await self.db.get_track(interaction.guild_id, 'current')
if not current_track or not ym_token:
client = await self.init_ym_client(interaction)
if not client:
return
track = cast(Track, Track.de_json(
current_track,
YMClient(ym_token), # type: ignore # Async client can be used here
))
current_track = await self.db.get_track(interaction.guild_id, 'current')
if not current_track:
logging.debug('[MENU] No current track found')
return
try:
lyrics = await track.get_lyrics_async()
lyrics = cast(TrackLyrics, await client.tracks_lyrics(current_track['id']))
except yandex_music.exceptions.NotFoundError:
logging.debug('[MENU] Lyrics not found')
await interaction.respond("❌ Текст песни не найден. Яндекс нам соврал (опять)!", delete_after=15, ephemeral=True)
return
if not lyrics:
logging.debug('[MENU] Lyrics not found')
return
embed = Embed(
title=track.title,
title=current_track['title'],
description='**Текст песни**',
color=0xfed42b,
)
text = await lyrics.fetch_lyrics_async()
for subtext in text.split('\n\n'):
embed.add_field(name='', value=subtext, inline=False)
await interaction.respond(embed=embed, ephemeral=True)
class MyVibeButton(Button, VoiceExtension):
@@ -192,62 +159,75 @@ class MyVibeButton(Button, VoiceExtension):
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[VIBE] My vibe button callback')
logging.info('[MENU] My vibe button callback')
if not await self.voice_check(interaction):
return
if not interaction.guild_id:
logging.warning('[VIBE] No guild id in button callback')
logging.warning('[MENU] No guild id in button callback')
return
track = await self.db.get_track(interaction.guild_id, 'current')
if track:
logging.info(f"[MENU] Playing vibe for track '{track["id"]}'")
await self.update_vibe(
res = await self.update_vibe(
interaction,
'track',
track['id'],
button_callback=True
track['id']
)
else:
logging.info('[VIBE] Playing on your wave')
await self.update_vibe(
logging.info('[MENU] Playing station user:onyourwave')
res = await self.update_vibe(
interaction,
'user',
'onyourwave',
button_callback=True
'onyourwave'
)
if not res:
logging.warning('[MENU] Failed to start the vibe')
await interaction.respond('Не удалось запустить "Мою Волну". Попробуйте позже.', ephemeral=True)
next_track = await self.db.get_track(interaction.guild_id, 'next')
if next_track:
# Need to avoid additional feedback.
# TODO: Make it more elegant
await self._play_next_track(interaction, next_track, button_callback=True)
class MyVibeSelect(Select, VoiceExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[VIBE] My vibe select callback')
logging.info('[MENU] My vibe select callback')
if not await self.voice_check(interaction):
return
if not interaction.user:
logging.warning('[VIBE] No user in select callback')
logging.warning('[MENU] No user in select callback')
return
custom_id = interaction.custom_id
if custom_id not in ('diversity', 'mood', 'lang'):
logging.warning(f'[VIBE] Unknown custom_id: {custom_id}')
logging.warning(f'[MENU] Unknown custom_id: {custom_id}')
return
data = interaction.data
if not data or 'values' not in data:
logging.warning('[VIBE] No data in select callback')
if not interaction.data or 'values' not in interaction.data:
logging.warning('[MENU] No data in select callback')
return
data_value = data['values'][0]
data_value = interaction.data['values'][0]
if data_value not in (
'fun', 'active', 'calm', 'sad', 'all',
'favorite', 'popular', 'discover', 'default',
'not-russian', 'russian', 'without-words', 'any'
):
logging.warning(f'[VIBE] Unknown data_value: {data_value}')
logging.warning(f'[MENU] Unknown data_value: {data_value}')
return
logging.info(f"[VIBE] Settings option '{custom_id}' updated to {data_value}")
logging.info(f"[MENU] Settings option '{custom_id}' updated to {data_value}")
await self.users_db.update(interaction.user.id, {f'vibe_settings.{custom_id}': data_value})
view = MyVibeSettingsView(interaction)
@@ -262,16 +242,15 @@ class MyVibeSettingsView(View, VoiceExtension):
def __init__(self, interaction: Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
self.interaction = interaction
async def init(self) -> None:
async def init(self) -> Self:
if not self.interaction.user:
logging.warning('[VIBE] No user in settings view')
return
logging.warning('[MENU] No user in settings view')
return self
settings = (await self.users_db.get_user(self.interaction.user.id, projection={'vibe_settings'}))['vibe_settings']
diversity_settings = settings['diversity']
diversity = [
SelectOption(label='Любое', value='default'),
@@ -279,7 +258,7 @@ class MyVibeSettingsView(View, VoiceExtension):
SelectOption(label='Незнакомое', value='discover', default=diversity_settings == 'discover'),
SelectOption(label='Популярное', value='popular', default=diversity_settings == 'popular')
]
mood_settings = settings['mood']
mood = [
SelectOption(label='Любое', value='all'),
@@ -288,7 +267,7 @@ class MyVibeSettingsView(View, VoiceExtension):
SelectOption(label='Спокойное', value='calm', default=mood_settings == 'calm'),
SelectOption(label='Грустное', value='sad', default=mood_settings == 'sad')
]
lang_settings = settings['lang']
lang = [
SelectOption(label='Любое', value='any'),
@@ -321,17 +300,19 @@ class MyVibeSettingsView(View, VoiceExtension):
for select in [feel_select, mood_select, lang_select]:
self.add_item(select)
return self
class MyVibeSettingsButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
super().__init__(**kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
logging.info('[VIBE] My vibe settings button callback')
if not await self.voice_check(interaction):
logging.info('[MENU] My vibe settings button callback')
if not await self.voice_check(interaction, check_vibe_privilage=True):
return
await interaction.respond('Настройки "Моей Волны"', view=MyVibeSettingsView(interaction), ephemeral=True)
await interaction.respond('Настройки "Моей Волны"', view=await MyVibeSettingsView(interaction).init(), ephemeral=True)
class AddToPlaylistSelect(Select, VoiceExtension):
def __init__(self, ym_client: YMClient, *args, **kwargs):
@@ -340,8 +321,11 @@ class AddToPlaylistSelect(Select, VoiceExtension):
self.ym_client = ym_client
async def callback(self, interaction: Interaction):
if not await self.voice_check(interaction, check_vibe_privilage=False):
logging.info('[MENU] Add to playlist select callback')
if not await self.voice_check(interaction):
return
if not interaction.guild_id or not interaction.data or 'values' not in interaction.data:
logging.warning('[MENU] No data in select callback')
return
@@ -351,19 +335,17 @@ class AddToPlaylistSelect(Select, VoiceExtension):
playlist = cast(Playlist, await self.ym_client.users_playlists(kind=data[0], user_id=data[1]))
current_track = await self.db.get_track(interaction.guild_id, 'current')
if not current_track:
return
try:
res = await self.ym_client.users_playlists_insert_track(
kind=f"{playlist.kind}",
track_id=current_track['id'],
album_id=current_track['albums'][0]['id'],
revision=playlist.revision or 1,
user_id=f"{playlist.uid}"
)
except yandex_music.exceptions.NetworkError:
res = None
res = await self.ym_client.users_playlists_insert_track(
kind=f"{playlist.kind}",
track_id=current_track['id'],
album_id=current_track['albums'][0]['id'],
revision=playlist.revision or 1,
user_id=f"{playlist.uid}"
)
if res:
await interaction.respond('✅ Добавлено в плейлист', delete_after=15, ephemeral=True)
@@ -376,7 +358,7 @@ class AddToPlaylistButton(Button, VoiceExtension):
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction):
if not await self.voice_check(interaction, check_vibe_privilage=False) or not interaction.guild_id:
if not await self.voice_check(interaction) or not interaction.guild_id:
return
client = await self.init_ym_client(interaction)
@@ -412,14 +394,14 @@ class MenuView(View, VoiceExtension):
VoiceExtension.__init__(self, None)
self.ctx = ctx
self.repeat_button = ToggleRepeatButton(style=ButtonStyle.secondary, emoji='🔂', row=0)
self.shuffle_button = ToggleShuffleButton(style=ButtonStyle.secondary, emoji='🔀', row=0)
self.repeat_button = ToggleButton(style=ButtonStyle.secondary, emoji='🔂', row=0, custom_id='repeat')
self.shuffle_button = ToggleButton(style=ButtonStyle.secondary, emoji='🔀', row=0, custom_id='shuffle')
self.play_pause_button = PlayPauseButton(style=ButtonStyle.primary, emoji='', row=0)
self.next_button = NextTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.prev_button = PrevTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.next_button = SwitchTrackButton(style=ButtonStyle.primary, emoji='', row=0, custom_id='next')
self.prev_button = SwitchTrackButton(style=ButtonStyle.primary, emoji='', row=0, custom_id='previous')
self.like_button = LikeButton(style=ButtonStyle.secondary, emoji='❤️', row=1)
self.dislike_button = DislikeButton(style=ButtonStyle.secondary, emoji='💔', row=1)
self.like_button = ReactionButton(style=ButtonStyle.secondary, emoji='❤️', row=1, custom_id='like')
self.dislike_button = ReactionButton(style=ButtonStyle.secondary, emoji='💔', row=1, custom_id='dislike')
self.lyrics_button = LyricsButton(style=ButtonStyle.secondary, emoji='📋', row=1)
self.add_to_playlist_button = AddToPlaylistButton(style=ButtonStyle.secondary, emoji='📁', row=1)
self.vibe_button = MyVibeButton(style=ButtonStyle.secondary, emoji='🌊', row=1)
@@ -474,7 +456,7 @@ class MenuView(View, VoiceExtension):
return self
async def on_timeout(self) -> None:
logging.debug('Menu timed out...')
logging.debug('[MENU] Menu timed out. Deleting menu message')
if not self.ctx.guild_id:
return
@@ -484,6 +466,6 @@ class MenuView(View, VoiceExtension):
message = await self.get_menu_message(self.ctx, self.guild['current_menu'])
if message:
await message.delete()
logging.debug('Successfully deleted menu message')
logging.debug('[MENU] Successfully deleted menu message')
else:
logging.debug('No menu message found')
logging.debug('[MENU] No menu message found')