impr: Code and logs improvement.

This commit is contained in:
Lemon4ksan
2025-01-24 22:43:42 +03:00
parent 3f9698fa7b
commit 3adcde37eb
5 changed files with 240 additions and 151 deletions

View File

@@ -1,5 +1,5 @@
import logging
from typing import cast
from typing import Literal, cast
import discord
from yandex_music import Track, Album, Artist, Playlist
@@ -31,6 +31,7 @@ class PlayButton(Button, VoiceExtension):
guild = self.db.get_guild(gid)
channel = cast(discord.VoiceChannel, interaction.channel)
member = cast(discord.Member, interaction.user)
action: Literal['add_track', 'add_album', 'add_artist', 'add_playlist']
if isinstance(self.item, Track):
tracks = [self.item]
@@ -38,39 +39,46 @@ class PlayButton(Button, VoiceExtension):
vote_message = f"{member.mention} хочет добавить трек **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Трек **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, Album):
album = await self.item.with_tracks_async()
if not album or not album.volumes:
logging.debug("Failed to fetch album tracks")
await interaction.respond("Не удалось получить треки альбома.", ephemeral=True)
return
tracks = [track for volume in album.volumes for track in volume]
action = 'add_album'
vote_message = f"{member.mention} хочет добавить альбом **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Альбом **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, Artist):
artist_tracks = await self.item.get_tracks_async()
if not artist_tracks:
logging.debug("Failed to fetch artist tracks")
await interaction.respond("Не удалось получить треки артиста.", ephemeral=True)
return
tracks = artist_tracks.tracks.copy()
action = 'add_artist'
vote_message = f"{member.mention} хочет добавить треки от **{self.item.name}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Песни артиста **{self.item.name}** были добавлены в очередь."
play_message = f"Сейчас играет: **{self.item.name}**!"
elif isinstance(self.item, Playlist):
short_tracks = await self.item.fetch_tracks_async()
if not short_tracks:
logging.debug("Failed to fetch playlist tracks")
await interaction.respond("Не удалось получить треки из плейлиста.", delete_after=15)
return
tracks = [cast(Track, short_track.track) for short_track in short_tracks]
action = 'add_playlist'
vote_message = f"{member.mention} хочет добавить плейлист **{self.item.title}** в очередь.\n\n Голосуйте за добавление."
response_message = f"Плейлист **{self.item.title}** был добавлен в очередь."
play_message = f"Сейчас играет: **{self.item.title}**!"
elif isinstance(self.item, list):
tracks = self.item.copy()
if not tracks:
@@ -81,16 +89,19 @@ class PlayButton(Button, VoiceExtension):
action = 'add_playlist'
vote_message = f"{member.mention} хочет добавить плейлист **** в очередь.\n\n Голосуйте за добавление."
response_message = f"Плейлист **«Мне нравится»** был добавлен в очередь."
play_message = f"Сейчас играет: **{tracks[0].title}**!"
else:
raise ValueError(f"Unknown item type: '{type(self.item).__name__}'")
if guild.get(f'vote_{action}') and len(channel.members) > 2 and not member.guild_permissions.manage_channels:
logging.debug(f"Starting vote for '{action}'")
message = cast(discord.Interaction, await interaction.respond(vote_message, delete_after=30))
response = await message.original_response()
await response.add_reaction('')
await response.add_reaction('')
self.db.update_vote(
gid,
response.id,
@@ -104,27 +115,30 @@ class PlayButton(Button, VoiceExtension):
)
else:
logging.debug(f"Skipping vote for '{action}'")
if guild['current_track'] is not None:
self.db.modify_track(gid, tracks, 'next', 'extend')
response_message = response_message
else:
track = tracks.pop(0)
self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
response_message = play_message
response_message = f"Сейчас играет: **{tracks[0].title}**!"
current_player = None
if guild['current_player']:
current_player = await self.get_player_message(interaction, guild['current_player'])
if current_player and interaction.message:
logging.debug(f"Deleting interaction message '{interaction.message.id}': current player '{current_player.id}' found")
await interaction.message.delete()
await interaction.respond(response_message, delete_after=15)
if current_player and interaction.message:
logging.debug(f"Deleting interaction message '{interaction.message.id}': current player '{current_player.id}' found")
await interaction.message.delete()
else:
await interaction.respond(response_message, delete_after=15)
class ListenView(View):
def __init__(self, item: Track | Album | Artist | Playlist | list[Track], *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
logging.debug(f"Creating view for type: '{type(item).__name__}'")
if isinstance(item, Track):
link_app = f"yandexmusic://album/{item.albums[0].id}/track/{item.id}"
link_web = f"https://music.yandex.ru/album/{item.albums[0].id}/track/{item.id}"
@@ -140,9 +154,11 @@ class ListenView(View):
elif isinstance(item, list): # Can't open other person's likes
self.add_item(PlayButton(item, label="Слушать в голосовом канале", style=ButtonStyle.gray))
return
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayButton = PlayButton(item, label="Слушать в голосовом канале", style=ButtonStyle.gray)
if item.available:
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
@@ -158,6 +174,7 @@ async def generate_item_embed(item: Track | Album | Artist | Playlist) -> Embed:
discord.Embed: Item embed.
"""
logging.debug(f"Generating embed for type: '{type(item).__name__}'")
if isinstance(item, Track):
return await generate_track_embed(item)
elif isinstance(item, Album):

View File

@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Literal, cast
from typing import Any, Literal, cast
from yandex_music import Track, ClientAsync
@@ -23,7 +23,7 @@ class VoiceExtension:
Args:
ctx (ApplicationContext | Interaction): Context.
player_mid (int): Id of the player message. There can only be only one player in the guild.
Returns:
bool: True if updated, False if not.
"""
@@ -31,18 +31,18 @@ class VoiceExtension:
f"Updating player embed using " +
"interaction context" if isinstance(ctx, Interaction) else
"application context" if isinstance(ctx, ApplicationContext) else
"raw reaction context" + " ..."
"raw reaction context"
)
player = await self.get_player_message(ctx, player_mid)
if not player:
return False
gid = ctx.guild_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.guild.id if ctx.guild else None
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context")
logging.warning("Guild ID or User ID not found in context inside 'update_player_embed'")
return False
player = await self.get_player_message(ctx, player_mid)
if not player:
return False
token = self.users_db.get_ym_token(uid)
@@ -120,9 +120,8 @@ class VoiceExtension:
Returns:
bool: Check result.
"""
logging.debug("Checking voice requirements...")
if not ctx.user:
logging.warning("User not found in context.")
logging.warning("User not found in context inside 'voice_check'")
return False
token = self.users_db.get_ym_token(ctx.user.id)
@@ -159,14 +158,13 @@ class VoiceExtension:
Returns:
discord.VoiceClient | None: Voice client or None.
"""
logging.debug("Getting voice client...")
if isinstance(ctx, Interaction):
voice_chat = discord.utils.get(ctx.client.voice_clients, guild=ctx.guild)
elif isinstance(ctx, RawReactionActionEvent):
if not self.bot:
raise ValueError("Bot instance is not set.")
if not ctx.guild_id:
logging.warning("Guild ID not found in context")
logging.warning("Guild ID not found in context inside get_voice_client")
return None
voice_chat = discord.utils.get(self.bot.voice_clients, guild=await self.bot.fetch_guild(ctx.guild_id))
elif isinstance(ctx, ApplicationContext):
@@ -175,19 +173,25 @@ class VoiceExtension:
raise ValueError(f"Invalid context type: '{type(ctx).__name__}'.")
if voice_chat:
logging.debug(f"Voice client found")
logging.debug("Voice client found")
else:
logging.debug("Voice client not found")
return cast((discord.VoiceClient | None), voice_chat)
async def play_track(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, track: Track) -> str | None:
async def play_track(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
track: Track,
vc: discord.VoiceClient | None = None
) -> str | None:
"""Download ``track`` by its id and play it in the voice channel. Return track title on success.
If sound is already playing, add track id to the queue. There's no response to the context.
Args:
ctx (ApplicationContext | Interaction): Context
track (Track): Track to play.
vc (discord.VoiceClient | None): Voice client.
Returns:
str | None: Song title or None.
@@ -195,12 +199,13 @@ class VoiceExtension:
gid = ctx.guild_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.guild.id if ctx.guild else None
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context")
logging.warning("Guild ID or User ID not found in context inside 'play_track'")
return None
vc = await self.get_voice_client(ctx)
if not vc:
return None
vc = await self.get_voice_client(ctx)
if not vc:
return None
if isinstance(ctx, Interaction):
loop = ctx.client.loop
@@ -218,7 +223,7 @@ class VoiceExtension:
song = discord.FFmpegPCMAudio(f'music/{gid}.mp3', options='-vn -filter:a "volume=0.15"')
vc.play(song, after=lambda exc: asyncio.run_coroutine_threadsafe(self.next_track(ctx, after=True), loop))
logging.debug(f"Playing track '{track.title}'")
logging.info(f"Playing track '{track.title}'")
self.db.set_current_track(gid, track)
self.db.update(gid, {'is_stopped': False})
@@ -229,36 +234,44 @@ class VoiceExtension:
return track.title
async def stop_playing(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent) -> None:
logging.debug("Stopping playback...")
async def stop_playing(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, vc: discord.VoiceClient | None = None) -> None:
gid = ctx.guild_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.guild.id if ctx.guild else None
if not gid:
logging.warning("Guild ID not found in context")
return
vc = await self.get_voice_client(ctx)
if not vc:
vc = await self.get_voice_client(ctx)
if vc:
logging.debug("Stopping playback")
self.db.update(gid, {'current_track': None, 'is_stopped': True})
vc.stop()
return
async def next_track(self, ctx: ApplicationContext | Interaction | RawReactionActionEvent, *, after: bool = False) -> str | None:
async def next_track(
self,
ctx: ApplicationContext | Interaction | RawReactionActionEvent,
vc: discord.VoiceClient | None = None,
*,
after: bool = False
) -> str | None:
"""Switch to the next track in the queue. Return track title on success.
Doesn't change track if stopped. Stop playing if tracks list is empty.
Args:
ctx (ApplicationContext | Interaction): Context
vc (discord.VoiceClient, optional): Voice client.
after (bool, optional): Whether the function is being called by the after callback. Defaults to False.
Returns:
str | None: Track title or None.
"""
logging.debug("Switching to the next track")
gid = ctx.guild_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.guild.id if ctx.guild else None
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
if not gid or not uid:
logging.warning("Guild ID or User ID not found in context.")
return
logging.warning("Guild ID or User ID not found in context inside 'next_track'")
return None
guild = self.db.get_guild(gid)
token = self.users_db.get_ym_token(uid)
@@ -270,11 +283,13 @@ class VoiceExtension:
logging.debug("Playback is stopped, skipping...")
return None
if not await self.get_voice_client(ctx): # Silently return if bot got kicked
logging.debug("Voice client not found")
return None
if not vc:
vc = await self.get_voice_client(ctx)
if not vc: # Silently return if bot got kicked
return None
if guild['repeat'] and after:
logging.debug("Repeating current track")
next_track = guild['current_track']
elif guild['shuffle']:
logging.debug("Shuffling tracks")
@@ -283,7 +298,8 @@ class VoiceExtension:
logging.debug("Getting next track")
next_track = self.db.get_track(gid, 'next')
if guild['current_track'] and guild['current_player']:
if guild['current_track'] and guild['current_player'] and not guild['repeat']:
logging.debug("Adding current track to history")
self.db.modify_track(gid, guild['current_track'], 'previous', 'insert')
if next_track:
@@ -291,19 +307,20 @@ class VoiceExtension:
next_track,
client=ClientAsync(token) # type: ignore # Async client can be used here.
)
await self.stop_playing(ctx)
await self.stop_playing(ctx, vc)
title = await self.play_track(
ctx,
ym_track # type: ignore # de_json should always work here.
ym_track, # type: ignore # de_json should always work here.
vc
)
if after and not guild['current_player'] and not isinstance(ctx, discord.RawReactionActionEvent):
await ctx.respond(f"Сейчас играет: **{title}**!", delete_after=15)
return title
else:
self.db.update(gid, {'is_stopped': True, 'current_track': None})
logging.info("No next track found")
self.db.update(gid, {'is_stopped': True, 'current_track': None})
return None
async def prev_track(self, ctx: ApplicationContext | Interaction) -> str | None:
@@ -316,9 +333,8 @@ class VoiceExtension:
Returns:
str | None: Track title or None.
"""
logging.debug("Switching to the previous track")
if not ctx.guild or not ctx.user:
logging.debug("Guild or User not found in context")
logging.warning("Guild or User not found in context inside 'prev_track'")
return None
gid = ctx.guild.id
@@ -328,11 +344,11 @@ class VoiceExtension:
if not token:
logging.debug(f"No token found for user {ctx.user.id}")
return
return None
if prev_track:
logging.debug("Previous track found")
track = prev_track
track: dict[str, Any] | None = prev_track
elif current_track:
logging.debug("No previous track found. Repeating current track")
track = self.db.get_track(gid, 'current')
@@ -363,7 +379,7 @@ class VoiceExtension:
str | None: Track title or None.
"""
if not ctx.guild or not ctx.user:
logging.warning("Guild or User not found in context.")
logging.warning("Guild or User not found in context inside 'like_track'")
return None
current_track = self.db.get_track(ctx.guild.id, 'current')