Files
YandexMusicDiscordBot/MusicBot/cogs/utils/voice.py
2025-01-10 16:19:27 +03:00

357 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import aiohttp
import asyncio
from math import ceil
from typing import cast
from io import BytesIO
from PIL import Image
from yandex_music import Track, ClientAsync
import discord
from discord import Interaction, ApplicationContext
from MusicBot.database import VoiceGuildsDatabase, BaseUsersDatabase
# This should be in find.py but recursive import is a thing
async def generate_player_embed(track: Track) -> discord.Embed:
"""Generate track embed for player.
Args:
track (yandex_music.Track): Track to be processed.
Returns:
discord.Embed: Track embed.
"""
title = cast(str, track.title) # casted types are always there, blame JS for that
avail = cast(bool, track.available)
artists = track.artists_name()
albums = [cast(str, album.title) for album in track.albums]
lyrics = cast(bool, track.lyrics_available)
duration = cast(int, track.duration_ms)
explicit = track.explicit or track.content_warning
bg_video = track.background_video_uri
metadata = track.meta_data
year = track.albums[0].year
artist = track.artists[0]
cover_url = track.get_cover_url('400x400')
color = await get_average_color_from_url(cover_url)
if explicit:
title += ' <:explicit:1325879701117472869>'
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover:
artist_cover_url = artist.get_op_image_url()
else:
artist_cover_url = artist_cover.get_url()
embed = discord.Embed(
title=title,
description=", ".join(albums),
color=color,
)
embed.set_thumbnail(url=cover_url)
embed.set_author(name=", ".join(artists), url=artist_url, icon_url=artist_cover_url)
embed.add_field(name="Текст песни", value="Есть" if lyrics else "Нет")
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if year:
embed.add_field(name="Год выпуска", value=str(year))
if metadata:
if metadata.year:
embed.add_field(name="Год выхода", value=str(metadata.year))
if metadata.number:
embed.add_field(name="Позиция", value=str(metadata.number))
if metadata.composer:
embed.add_field(name="Композитор", value=metadata.composer)
if metadata.version:
embed.add_field(name="Версия", value=metadata.version)
if bg_video:
embed.add_field(name="Видеофон", value=f"[Ссылка]({bg_video})")
if not avail:
embed.set_footer(text=f"Трек в данный момент недоступен.")
return embed
async def get_average_color_from_url(url: str) -> int:
"""Get image from url and calculate its average color to use in embeds.
Args:
url (str): Image url.
Returns:
int: RGB Hex code. 0x000 if failed.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
response = await response.read()
img = Image.open(BytesIO(response))
img = img.convert('RGB')
width, height = img.size
r_total, g_total, b_total = 0, 0, 0
for y in range(height):
for x in range(width):
r, g, b = cast(tuple, img.getpixel((x, y)))
r_total += r
g_total += g
b_total += b
count = width * height
r = r_total // count
g = g_total // count
b = b_total // count
return (r << 16) + (g << 8) + b
except Exception:
return 0x000
class VoiceExtension:
def __init__(self) -> None:
self.db = VoiceGuildsDatabase()
self.users_db = BaseUsersDatabase()
async def update_player_embed(self, ctx: ApplicationContext | Interaction, player_mid: int) -> None:
"""Update current player message by its id.
Args:
ctx (ApplicationContext | Interaction): Context.
player_mid (int): Id of the player message. There can only be only one player in the guild.
"""
if isinstance(ctx, Interaction):
player = ctx.client.get_message(player_mid)
else:
player = await ctx.fetch_message(player_mid)
if not player:
return
guild = ctx.guild
user = ctx.user
if guild and user:
token = self.users_db.get_ym_token(user.id)
current_track = self.db.get_track(guild.id, 'current')
track = cast(Track, Track.de_json(current_track, client=ClientAsync(token))) # type: ignore
embed = await generate_player_embed(track)
if isinstance(ctx, Interaction) and ctx.message and ctx.message.id == player_mid:
# If interaction from player buttons
await ctx.edit(embed=embed)
else:
# If interaction from other buttons. They should have thair own response.
await player.edit(embed=embed)
async def voice_check(self, ctx: ApplicationContext | Interaction) -> bool:
"""Check if bot can perform voice tasks and respond if failed.
Args:
ctx (discord.ApplicationContext): Command context.
Returns:
bool: Check result.
"""
if not ctx.user:
return False
token = self.users_db.get_ym_token(ctx.user.id)
if not token:
await ctx.respond('❌ Необходимо указать свой токен доступа с помощью комманды /login.', delete_after=15, ephemeral=True)
return False
channel = ctx.channel
if not isinstance(channel, discord.VoiceChannel):
await ctx.respond("❌ Вы должны отправить команду в голосовом канале.", delete_after=15, ephemeral=True)
return False
if isinstance(ctx, Interaction):
channels = ctx.client.voice_clients
else:
channels = ctx.bot.voice_clients
voice_chat = discord.utils.get(channels, guild=ctx.guild)
if not voice_chat:
await ctx.respond("❌ Добавьте бота в голосовой канал при помощи команды /voice join.", delete_after=15, ephemeral=True)
return False
return True
def get_voice_client(self, ctx: ApplicationContext | Interaction) -> discord.VoiceClient | None:
"""Return voice client for the given guild id. Return None if not present.
Args:
ctx (ApplicationContext | Interaction): Command context.
Returns:
discord.VoiceClient | None: Voice client.
"""
if isinstance(ctx, Interaction):
voice_chat = discord.utils.get(ctx.client.voice_clients, guild=ctx.guild)
else:
voice_chat = discord.utils.get(ctx.bot.voice_clients, guild=ctx.guild)
return cast(discord.VoiceClient, voice_chat)
async def play_track(self, ctx: ApplicationContext | Interaction, track: Track) -> str | None:
"""Download ``track`` by its id and play it in the voice channel. Return track title on success and don't respond.
If sound is already playing, add track id to the queue and respond.
Args:
ctx (ApplicationContext | Interaction): Context
track (Track): Track class with id and title specified.
Returns:
str | None: Song title or None.
"""
if not ctx.guild:
return
vc = self.get_voice_client(ctx)
if not vc:
return
if isinstance(ctx, Interaction):
loop = ctx.client.loop
else:
loop = ctx.bot.loop
gid = ctx.guild.id
guild = self.db.get_guild(gid)
await track.download_async(f'music/{ctx.guild_id}.mp3')
song = discord.FFmpegPCMAudio(f'music/{ctx.guild_id}.mp3', options='-vn -filter:a "volume=0.15"')
vc.play(song, after=lambda exc: asyncio.run_coroutine_threadsafe(self.next_track(ctx), loop))
self.db.set_current_track(gid, track)
self.db.update(gid, {'is_stopped': False})
player = guild['current_player']
if player is not None:
await self.update_player_embed(ctx, player)
return track.title
def pause_playing(self, ctx: ApplicationContext | Interaction) -> None:
vc = self.get_voice_client(ctx)
if vc:
vc.pause()
def resume_playing(self, ctx: ApplicationContext | Interaction) -> None:
vc = self.get_voice_client(ctx)
if vc:
vc.resume()
def stop_playing(self, ctx: ApplicationContext | Interaction) -> None:
if not ctx.guild:
return
vc = self.get_voice_client(ctx)
if vc:
self.db.update(ctx.guild.id, {'current_track': None, 'is_stopped': True})
vc.stop()
async def next_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Switch to the next track in the queue. Return track title on success.
Stop playing if tracks list is empty.
Args:
ctx (ApplicationContext | Interaction): Context
Returns:
str | None: Track title or None.
"""
if not ctx.guild or not ctx.user:
return
gid = ctx.guild.id
guild = self.db.get_guild(gid)
token = self.users_db.get_ym_token(ctx.user.id)
if guild.get('is_stopped'):
return
if not self.get_voice_client(ctx): # Silently return if bot got kicked
return
current_track = guild['current_track']
next_track = self.db.get_track(gid, 'next')
if next_track and current_track:
self.db.modify_track(gid, current_track, 'previous', 'insert')
ym_track = Track.de_json(next_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
elif next_track:
ym_track = Track.de_json(next_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
elif current_track:
self.db.modify_track(gid, current_track, 'previous', 'insert')
self.stop_playing(ctx)
async def prev_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Switch to the previous track in the queue. Repeat curren the song if no previous tracks.
Return track title on success.
Args:
ctx (ApplicationContext | Interaction): Context.
Returns:
str | None: Track title or None.
"""
if not ctx.guild or not ctx.user:
return
gid = ctx.guild.id
token = self.users_db.get_ym_token(ctx.user.id)
current_track = self.db.get_track(gid, 'current')
prev_track = self.db.get_track(gid, 'previous')
if prev_track:
if current_track:
self.db.modify_track(gid, current_track, 'next', 'insert')
ym_track = Track.de_json(prev_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
elif current_track:
return await self.repeat_current_track(ctx)
async def repeat_current_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Repeat current track. Return track title on success.
Args:
ctx (ApplicationContext | Interaction): Context
Returns:
str | None: Track title or None.
"""
if not ctx.guild or not ctx.user:
return
gid = ctx.guild.id
token = self.users_db.get_ym_token(ctx.user.id)
current_track = self.db.get_track(gid, 'current')
if current_track:
ym_track = Track.de_json(current_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore