Merge pull request #1 from Lemon4ksan/dev

Обновление бота #1
This commit is contained in:
Bananchiki
2025-02-13 18:10:52 +03:00
committed by GitHub
10 changed files with 376 additions and 266 deletions

View File

@@ -12,17 +12,18 @@ from yandex_music import Track, Album, Artist, Playlist
from MusicBot.database import BaseUsersDatabase, BaseGuildsDatabase
from MusicBot.ui import ListenView, MyPlaylists, generate_playlists_embed
from MusicBot.ui import ListenView
from MusicBot.cogs.utils.embeds import generate_item_embed
users_db = BaseUsersDatabase()
def setup(bot):
bot.add_cog(General(bot))
async def get_search_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
if not ctx.interaction.user or not ctx.value or len(ctx.value) < 2:
return []
users_db = BaseUsersDatabase()
token = await users_db.get_ym_token(ctx.interaction.user.id)
if not token:
logging.info(f"[GENERAL] User {ctx.interaction.user.id} has no token")
@@ -56,20 +57,35 @@ async def get_search_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
for item in search.playlists.results:
res.append(f"{item.title}")
elif content_type == "Свой плейлист":
if not client.me or not client.me.account or not client.me.account.uid:
logging.warning(f"Failed to get playlists for user {ctx.interaction.user.id}")
else:
playlists_list = await client.users_playlists_list(client.me.account.uid)
res = [playlist.title if playlist.title else 'Без названия' for playlist in playlists_list]
playlists_list = await client.users_playlists_list()
res = [playlist.title if playlist.title else 'Без названия' for playlist in playlists_list]
return res[:100]
async def get_user_playlists_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
if not ctx.interaction.user or not ctx.value or len(ctx.value) < 2:
return []
token = await users_db.get_ym_token(ctx.interaction.user.id)
if not token:
logging.info(f"[GENERAL] User {ctx.interaction.user.id} has no token")
return []
try:
client = await YMClient(token).init()
except yandex_music.exceptions.UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.interaction.user.id} provided invalid token")
return []
playlists_list = await client.users_playlists_list()
return [playlist.title for playlist in playlists_list if playlist.title and ctx.value in playlist.title][:100]
class General(Cog):
def __init__(self, bot: discord.Bot):
self.bot = bot
self.db = BaseGuildsDatabase()
self.users_db = BaseUsersDatabase()
self.users_db = users_db
account = discord.SlashCommandGroup("account", "Команды, связанные с аккаунтом.")
@@ -106,7 +122,6 @@ class General(Cog):
value="""`account`
`find`
`help`
`like`
`queue`
`settings`
`track`
@@ -116,26 +131,23 @@ class General(Cog):
embed.set_footer(text='©️ Bananchiki')
elif command == 'account':
embed.description += (
"Ввести токен от Яндекс Музыки. Его можно получить [здесь](https://github.com/MarshalX/yandex-music-api/discussions/513).\n"
"Ввести токен Яндекс Музыки. Его можно получить [здесь](https://github.com/MarshalX/yandex-music-api/discussions/513).\n"
"```/account login <token>```\n"
"Удалить токен из базы данных бота.\n```/account remove```\n"
"Получить ваши плейлисты. Чтобы добавить плейлист в очередь, используйте команду /find.\n```/account playlists```\n"
"Получить ваш плейлист.\n```/account playlist <название>```\n"
"Получить плейлист «Мне нравится».\n```/account likes```\n"
"Получить ваши рекомендации.\n```/account recommendations <тип>```\n"
)
elif command == 'find':
embed.description += (
"Вывести информацию о треке (по умолчанию), альбоме, авторе или плейлисте. Позволяет добавить музыку в очередь. "
"В названии можно уточнить автора или версию. Возвращается лучшее совпадение.\n```/find <название> <тип>```"
"В названии можно уточнить автора через «-». Возвращается лучшее совпадение.\n```/find <тип> <название>```"
)
elif command == 'help':
embed.description += (
"Вывести список всех команд.\n```/help```\n"
"Получить информацию о конкретной команде.\n```/help <команда>```"
)
elif command == 'like':
embed.description += (
"Добавить трек в плейлист «Мне нравится». Пользовательские треки из этого плейлиста игнорируются.\n```/like```"
)
elif command == 'queue':
embed.description += (
"Получить очередь треков. По 15 элементов на страницу.\n```/queue get```\n"
@@ -144,11 +156,12 @@ class General(Cog):
)
elif command == 'settings':
embed.description += (
"`Примечание`: Только пользователи с разрешением управления каналом могут менять настройки.\n\n"
"Получить текущие настройки.\n```/settings show```\n"
"Разрешить или запретить воспроизведение Explicit треков и альбомов. Если автор или плейлист содержат Explicit треки, убираются кнопки для доступа к ним.\n```/settings explicit```\n"
"Разрешить или запретить создание меню проигрывателя, когда в канале больше одного человека.\n```/settings menu```\n"
"Разрешить или запретить голосование.\n```/settings vote <тип голосования>```\n"
"`Примечание`: Только пользователи с разрешением управления каналом могут менять настройки."
"Разрешить или запретить голосование.\n```/settings vote <тип>```\n"
"Разрешить или запретить отключение/подключение бота к каналу участникам без прав управления каналом.\n```/settings connect```\n"
)
elif command == 'track':
embed.description += (
@@ -157,15 +170,16 @@ class General(Cog):
"Приостановить текущий трек.\n```/track pause```\n"
"Возобновить текущий трек.\n```/track resume```\n"
"Прервать проигрывание, удалить историю, очередь и текущий плеер.\n ```/track stop```\n"
"Добавить трек в плейлист «Мне нравится» или удалить его, если он уже там.\n```/track like```"
"Запустить Мою Волну по текущему треку.\n```/track vibe```"
)
elif command == 'voice':
embed.description += (
"`Примечание`: Доступность меню и Моей Волны зависит от настроек сервера.\n\n"
"Присоединить бота в голосовой канал. Требует разрешения управления каналом.\n```/voice join```\n"
"Заставить бота покинуть голосовой канал. Требует разрешения управления каналом.\n ```/voice leave```\n"
"Создать меню проигрывателя. По умолчанию работает только когда в канале один человек.\n```/voice menu```\n"
"Запустить Мою Волну. По умолчанию работает только когда в канале один человек.\n```/vibe```"
"Присоединить бота в голосовой канал.\n```/voice join```\n"
"Заставить бота покинуть голосовой канал.\n ```/voice leave```\n"
"Создать меню проигрывателя. \n```/voice menu```\n"
"Запустить станцию. Без уточнения станции, запускает Мою Волну.\n```/voice vibe <название станции>```"
)
else:
response_message = '❌ Неизвестная команда.'
@@ -173,7 +187,7 @@ class General(Cog):
await ctx.respond(response_message, embed=embed, ephemeral=True)
@account.command(description="Ввести токен от Яндекс Музыки.")
@account.command(description="Ввести токен Яндекс Музыки.")
@discord.option("token", type=discord.SlashCommandOptionType.string, description="Токен.")
async def login(self, ctx: discord.ApplicationContext, token: str) -> None:
logging.info(f"[GENERAL] Login command invoked by user {ctx.author.id} in guild {ctx.guild.id}")
@@ -190,9 +204,14 @@ class General(Cog):
logging.info(f"[GENERAL] Token saved for user {ctx.author.id}")
await ctx.respond(f'Привет, {about['account']['first_name']}!', delete_after=15, ephemeral=True)
@account.command(description="Удалить токен из датабазы бота.")
@account.command(description="Удалить токен из базы данных бота.")
async def remove(self, ctx: discord.ApplicationContext) -> None:
logging.info(f"[GENERAL] Remove command invoked by user {ctx.author.id} in guild {ctx.guild.id}")
if not await self.users_db.get_ym_token(ctx.user.id):
logging.info(f"[GENERAL] No token found for user {ctx.author.id}")
await ctx.respond('❌ Токен не указан.', delete_after=15, ephemeral=True)
return
await self.users_db.update(ctx.user.id, {'ym_token': None})
await ctx.respond(f'Токен был удалён.', delete_after=15, ephemeral=True)
@@ -228,30 +247,111 @@ class General(Cog):
logging.info(f"[GENERAL] Successfully fetched likes for user {ctx.user.id}")
await ctx.respond(embed=embed, view=ListenView(tracks))
@account.command(description="Получить ваши плейлисты.")
async def playlists(self, ctx: discord.ApplicationContext) -> None:
logging.info(f"[GENERAL] Playlists command invoked by user {ctx.user.id} in guild {ctx.guild_id}")
@account.command(description="Получить ваши рекомендации.")
@discord.option(
'тип',
parameter_name='content_type',
description="Вид рекомендаций.",
type=discord.SlashCommandOptionType.string,
choices=['Премьера', 'Плейлист дня', 'Дежавю']
)
async def recommendations(
self,
ctx: discord.ApplicationContext,
content_type: Literal['Премьера', 'Плейлист дня', 'Дежавю']
)-> None:
# NOTE: Recommendations can be accessed by using /find, but it's more convenient to have it in separate command.
logging.debug(f"[GENERAL] Recommendations command invoked by user {ctx.user.id} in guild {ctx.guild_id} for type '{content_type}'")
guild = await self.db.get_guild(ctx.guild_id)
token = await self.users_db.get_ym_token(ctx.user.id)
if not token:
await ctx.respond("❌ Укажите токен через /account login.", delete_after=15, ephemeral=True)
return
client = await YMClient(token).init()
if not client.me or not client.me.account or not client.me.account.uid:
search = await client.search(content_type, False, 'playlist')
if not search or not search.playlists:
logging.info(f"[GENERAL] Failed to fetch recommendations for user {ctx.user.id}")
await ctx.respond('❌ Что-то пошло не так. Повторите попытку позже.', delete_after=15, ephemeral=True)
return
playlists_list = await client.users_playlists_list(client.me.account.uid)
playlists: list[tuple[str, int]] = [
(playlist.title if playlist.title else 'Без названия', playlist.track_count if playlist.track_count else 0) for playlist in playlists_list
]
playlist = search.playlists.results[0]
if playlist is None:
logging.info(f"[GENERAL] Failed to fetch recommendations for user {ctx.user.id}")
await ctx.respond('❌ Что-то пошло не так. Повторите попытку позже.', delete_after=15, ephemeral=True)
await self.users_db.update(ctx.user.id, {'playlists': playlists, 'playlists_page': 0})
embed = generate_playlists_embed(0, playlists)
tracks = await playlist.fetch_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{content_type}' returned no tracks")
await ctx.respond("❌ Пустой плейлист.", delete_after=15, ephemeral=True)
return
logging.info(f"[GENERAL] Successfully fetched playlists for user {ctx.user.id}")
await ctx.respond(embed=embed, view=await MyPlaylists(ctx).init(), ephemeral=True)
embed = await generate_item_embed(playlist)
view = ListenView(playlist)
for track_short in playlist.tracks:
track = cast(Track, track_short.track)
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{content_type}' returned explicit content and is not allowed on this server")
embed.set_footer(text="Воспроизведение недоступно, так как в плейлисте присутствуют Explicit треки")
view = None
break
await ctx.respond(embed=embed, view=view)
@account.command(description="Получить ваш плейлист.")
@discord.option(
"запрос",
parameter_name='name',
description="Название плейлиста.",
type=discord.SlashCommandOptionType.string,
autocomplete=discord.utils.basic_autocomplete(get_user_playlists_suggestions)
)
async def playlist(self, ctx: discord.ApplicationContext, name: str) -> None:
logging.info(f"[GENERAL] Playlists command invoked by user {ctx.user.id} in guild {ctx.guild_id}")
guild = await self.db.get_guild(ctx.guild_id, projection={'allow_explicit': 1})
token = await self.users_db.get_ym_token(ctx.user.id)
if not token:
logging.info(f"[GENERAL] No token found for user {ctx.user.id}")
await ctx.respond("❌ Укажите токен через /account login.", delete_after=15, ephemeral=True)
return
try:
client = await YMClient(token).init()
except yandex_music.exceptions.UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.user.id} provided invalid token")
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
return
playlists = await client.users_playlists_list()
playlist = next((playlist for playlist in playlists if playlist.title == name), None)
if not playlist:
logging.info(f"[GENERAL] User {ctx.user.id} playlist '{name}' not found")
await ctx.respond("❌ Плейлист не найден.", delete_after=15, ephemeral=True)
return
tracks = await playlist.fetch_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} playlist '{name}' is empty")
await ctx.respond("❌ Плейлист пуст.", delete_after=15, ephemeral=True)
return
embed = await generate_item_embed(playlist)
view = ListenView(playlist)
for track_short in playlist.tracks:
track = cast(Track, track_short.track)
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
embed.set_footer(text="Воспроизведение недоступно, так как в плейлисте присутствуют Explicit треки")
view = None
break
await ctx.respond(embed=embed, view=view)
@discord.slash_command(description="Найти контент и отправить информацию о нём. Возвращается лучшее совпадение.")
@discord.option(
@@ -259,7 +359,7 @@ class General(Cog):
parameter_name='content_type',
description="Тип контента для поиска.",
type=discord.SlashCommandOptionType.string,
choices=['Трек', 'Альбом', 'Артист', 'Плейлист', 'Свой плейлист'],
choices=['Трек', 'Альбом', 'Артист', 'Плейлист'],
)
@discord.option(
"запрос",
@@ -271,9 +371,11 @@ class General(Cog):
async def find(
self,
ctx: discord.ApplicationContext,
content_type: Literal['Трек', 'Альбом', 'Артист', 'Плейлист', 'Свой плейлист'],
content_type: Literal['Трек', 'Альбом', 'Артист', 'Плейлист'],
name: str
) -> None:
# TODO: Improve explicit check by excluding bad tracks from the queue and not fully discard the artist/album/playlist.
logging.info(f"[GENERAL] Find command invoked by user {ctx.user.id} in guild {ctx.guild_id} for '{content_type}' with name '{name}'")
guild = await self.db.get_guild(ctx.guild_id, projection={'allow_explicit': 1})
@@ -290,89 +392,61 @@ class General(Cog):
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
return
if content_type == 'Свой плейлист':
if not client.me or not client.me.account or not client.me.account.uid:
logging.warning(f"Failed to get user info for user {ctx.user.id}")
await ctx.respond("Не удалось получить информацию о пользователе.", delete_after=15, ephemeral=True)
return
result = await client.search(name, nocorrect=True)
if not result:
logging.warning(f"Failed to search for '{name}' for user {ctx.user.id}")
await ctx.respond("❌ Что-то пошло не так. Повторите попытку позже.", delete_after=15, ephemeral=True)
return
playlists = await client.users_playlists_list(client.me.account.uid)
result = next((playlist for playlist in playlists if playlist.title == name), None)
if not result:
logging.info(f"[GENERAL] User {ctx.user.id} playlist '{name}' not found")
await ctx.respond("❌ Плейлист не найден.", delete_after=15, ephemeral=True)
return
tracks = await result.fetch_tracks_async()
if content_type == 'Трек':
content = result.tracks
elif content_type == 'Альбом':
content = result.albums
elif content_type == 'Артист':
content = result.artists
elif content_type == 'Плейлист':
content = result.playlists
if not content:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no results")
await ctx.respond("❌ По запросу ничего не найдено.", delete_after=15, ephemeral=True)
return
content = content.results[0]
embed = await generate_item_embed(content)
view = ListenView(content)
if isinstance(content, (Track, Album)) and (content.explicit or content.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
await ctx.respond("❌ Explicit контент запрещён на этом сервере.", delete_after=15, ephemeral=True)
return
elif isinstance(content, Artist):
tracks = await content.get_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} playlist '{name}' is empty")
await ctx.respond("Плейлист пуст.", delete_after=15, ephemeral=True)
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no tracks")
await ctx.respond("Треки от этого исполнителя не найдены.", delete_after=15, ephemeral=True)
return
for track_short in tracks:
for track in tracks:
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
view = None
embed.set_footer(text="Воспроизведение недоступно, так как у автора присутствуют Explicit треки")
break
elif isinstance(content, Playlist):
tracks = await content.fetch_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no tracks")
await ctx.respond("❌ Пустой плейлист.", delete_after=15, ephemeral=True)
return
for track_short in content.tracks:
track = cast(Track, track_short.track)
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} playlist '{name}' contains explicit content and is not allowed on this server")
await ctx.respond("❌ Explicit контент запрещён на этом сервере.", delete_after=15, ephemeral=True)
return
embed = await generate_item_embed(result)
view = ListenView(result)
else:
result = await client.search(name, nocorrect=True)
if not result:
logging.warning(f"Failed to search for '{name}' for user {ctx.user.id}")
await ctx.respond("❌ Что-то пошло не так. Повторите попытку позже.", delete_after=15, ephemeral=True)
return
if content_type == 'Трек':
content = result.tracks
elif content_type == 'Альбом':
content = result.albums
elif content_type == 'Артист':
content = result.artists
elif content_type == 'Плейлист':
content = result.playlists
if not content:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no results")
await ctx.respond("❌ По запросу ничего не найдено.", delete_after=15, ephemeral=True)
return
content = content.results[0]
embed = await generate_item_embed(content)
view = ListenView(content)
if isinstance(content, (Track, Album)) and (content.explicit or content.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
await ctx.respond("❌ Explicit контент запрещён на этом сервере.", delete_after=15, ephemeral=True)
return
elif isinstance(content, Artist):
tracks = await content.get_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no tracks")
await ctx.respond("❌ Треки от этого исполнителя не найдены.", delete_after=15, ephemeral=True)
return
for track in tracks:
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
view = None
embed.set_footer(text="Воспроизведение недоступно, так как у автора присутствуют Explicit треки")
break
elif isinstance(content, Playlist):
tracks = await content.fetch_tracks_async()
if not tracks:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned no tracks")
await ctx.respond("❌ Пустой плейлист.", delete_after=15, ephemeral=True)
return
for track_short in content.tracks:
track = cast(Track, track_short.track)
if (track.explicit or track.content_warning) and not guild['allow_explicit']:
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
view = None
embed.set_footer(text="Воспроизведение недоступно, так как у автора присутствуют Explicit треки")
break
logging.info(f"[GENERAL] User {ctx.user.id} search for '{name}' returned explicit content and is not allowed on this server")
view = None
embed.set_footer(text="Воспроизведение недоступно, так как в плейлисте присутствуют Explicit треки")
break
logging.info(f"[GENERAL] Successfully generated '{content_type}' message for user {ctx.author.id}")
await ctx.respond(embed=embed, view=view)

View File

@@ -99,10 +99,13 @@ async def _generate_track_embed(track: Track) -> Embed:
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover:
if not artist_cover and artist.op_image:
artist_cover_url = artist.get_op_image_url()
else:
elif artist_cover:
artist_cover_url = artist_cover.get_url()
else:
artist_cover_url = None
embed = Embed(
title=title,
@@ -172,10 +175,13 @@ async def _generate_album_embed(album: Album) -> Embed:
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover:
if not artist_cover and artist.op_image:
artist_cover_url = artist.get_op_image_url()
else:
elif artist_cover:
artist_cover_url = artist_cover.get_url()
else:
artist_cover_url = None
embed = Embed(
title=title,
@@ -264,26 +270,28 @@ async def _generate_playlist_embed(playlist: Playlist) -> Embed:
title = cast(str, playlist.title)
track_count = playlist.track_count
avail = cast(bool, playlist.available)
description = playlist.description_formatted
description = playlist.description
year = playlist.created
modified = playlist.modified
duration = playlist.duration_ms
likes_count = playlist.likes_count
color = 0x000
cover_url = None
if playlist.cover and playlist.cover.uri:
cover_url = f"https://{playlist.cover.uri.replace('%%', '400x400')}"
else:
tracks = await playlist.fetch_tracks_async()
for i in range(len(tracks)):
track = tracks[i].track
if not track or not track.albums or not track.albums[0].cover_uri:
continue
for track_short in tracks:
track = track_short.track
if track and track.albums and track.albums[0].cover_uri:
cover_url = f"https://{track.albums[0].cover_uri.replace('%%', '400x400')}"
break
if cover_url:
color = await _get_average_color_from_url(cover_url)
else:
color = 0x000
embed = Embed(
title=title,

View File

@@ -3,7 +3,6 @@ import aiofiles
import logging
import io
from typing import Any, Literal, cast
from time import time
import yandex_music.exceptions
from yandex_music import Track, TrackShort, ClientAsync as YMClient
@@ -24,16 +23,17 @@ class VoiceExtension:
self.db = VoiceGuildsDatabase()
self.users_db = BaseUsersDatabase()
async def send_menu_message(self, ctx: ApplicationContext | Interaction) -> bool:
async def send_menu_message(self, ctx: ApplicationContext | Interaction, *, disable: bool = False) -> bool:
"""Send menu message to the channel and delete old menu message if exists. Return True if sent.
Args:
ctx (ApplicationContext | Interaction): Context.
disable (bool, optional): Disable menu message. Defaults to False.
Returns:
bool: True if sent, False if not.
"""
logging.info("[VC_EXT] Sending menu message")
logging.info(f"[VC_EXT] Sending menu message to channel {ctx.channel_id} in guild {ctx.guild_id}")
if not ctx.guild_id:
logging.warning("[VC_EXT] Guild id not found in context inside 'create_menu'")
@@ -42,14 +42,16 @@ class VoiceExtension:
guild = await self.db.get_guild(ctx.guild_id, projection={'current_track': 1, 'current_menu': 1, 'vibing': 1})
if guild['current_track']:
if not (vc := await self.get_voice_client(ctx)):
return False
track = cast(Track, Track.de_json(
guild['current_track'],
client=YMClient() # type: ignore
))
embed = await generate_item_embed(track, guild['vibing'])
vc = await self.get_voice_client(ctx)
if vc and vc.is_paused():
if vc.is_paused():
embed.set_footer(text='Приостановлено')
else:
embed.remove_footer()
@@ -62,7 +64,7 @@ class VoiceExtension:
if message:
await message.delete()
await self._update_menu_views_dict(ctx)
await self._update_menu_views_dict(ctx, disable=disable)
interaction = await ctx.respond(view=menu_views[ctx.guild_id], embed=embed)
response = await interaction.original_response() if isinstance(interaction, discord.Interaction) else interaction
await self.db.update(ctx.guild_id, {'current_menu': response.id})
@@ -81,7 +83,7 @@ class VoiceExtension:
Returns:
(discord.Message | None): Menu message or None.
"""
logging.debug(f"[VC_EXT] Fetching menu message {menu_mid}...")
logging.debug(f"[VC_EXT] Fetching menu message {menu_mid} in guild {ctx.guild_id}")
if not ctx.guild_id:
logging.warning("[VC_EXT] Guild ID not found in context")
@@ -104,9 +106,9 @@ class VoiceExtension:
return None
if menu:
logging.debug("[VC_EXT] Menu message found")
logging.debug(f"[VC_EXT] Menu message {menu_mid} successfully fetched")
else:
logging.debug("[VC_EXT] Menu message not found. Resetting current_menu field.")
logging.debug(f"[VC_EXT] Menu message {menu_mid} not found in guild {ctx.guild_id}")
await self.db.update(ctx.guild_id, {'current_menu': None})
return menu
@@ -151,11 +153,9 @@ class VoiceExtension:
if not menu_mid:
logging.warning("[VC_EXT] No menu message or menu message id provided")
return False
menu = await self.get_menu_message(ctx, menu_mid)
else:
menu = menu_message
menu_message = await self.get_menu_message(ctx, menu_mid)
if not menu:
if not menu_message:
return False
if not guild['current_track']:
@@ -164,9 +164,8 @@ class VoiceExtension:
track = cast(Track, Track.de_json(
guild['current_track'],
client=YMClient(), # type: ignore
client=YMClient() # type: ignore
))
embed = await generate_item_embed(track, guild['vibing'])
await self._update_menu_views_dict(ctx)
@@ -176,12 +175,12 @@ class VoiceExtension:
await ctx.edit(embed=embed, view=menu_views[gid])
else:
# If interaction from other buttons or commands. They should have their own response.
await menu.edit(embed=embed, view=menu_views[gid])
await menu_message.edit(embed=embed, view=menu_views[gid])
except discord.NotFound:
logging.warning("[VC_EXT] Menu message not found")
return False
logging.debug("[VC_EXT] Menu embed updated")
logging.debug("[VC_EXT] Menu embed updated successfully")
return True
async def update_menu_view(
@@ -225,12 +224,14 @@ class VoiceExtension:
except discord.NotFound:
logging.warning("[VC_EXT] Menu message not found")
return False
logging.debug("[VC_EXT] Menu view updated successfully")
return True
async def update_vibe(
self,
ctx: ApplicationContext | Interaction,
type: Literal['track', 'album', 'artist', 'playlist', 'user'],
type: str,
id: str | int,
*,
update_settings: bool = False
@@ -240,15 +241,15 @@ class VoiceExtension:
Args:
ctx (ApplicationContext | Interaction): Context.
type (Literal['track', 'album', 'artist', 'playlist', 'user']): Type of the item.
id (str | int): ID of the YandexMusic item.
type (str): Type of the item.
id (str | int): ID of the item.
update_settings (bool, optional): Update vibe settings by sending feedack usind data from database. Defaults to False.
Returns:
bool: True if vibe was updated successfully. False otherwise.
"""
logging.info(f"[VC_EXT] Updating vibe for guild {ctx.guild_id} with type '{type}' and id '{id}'")
gid = ctx.guild_id
uid = ctx.user_id if isinstance(ctx, discord.RawReactionActionEvent) else ctx.user.id if ctx.user else None
@@ -275,12 +276,9 @@ class VoiceExtension:
)
if not guild['vibing']:
logging.debug(f"[VIBE] Starting radio '{type}:{id}'")
feedback = await client.rotor_station_feedback_radio_started(
f"{type}:{id}",
f"desktop-user-{client.me.account.uid}", # type: ignore
timestamp=time()
f"desktop-user-{client.me.account.uid}", # type: ignore # That's made up, but it doesn't do much anyway.
)
if not feedback:
@@ -430,14 +428,15 @@ class VoiceExtension:
logging.warning("Guild ID or User ID not found in context")
return None
guild = await self.db.get_guild(gid, projection={'current_menu': 1, 'vibing': 1})
guild = await self.db.get_guild(gid, projection={'current_menu': 1, 'vibing': 1, 'current_track': 1})
vc = await self.get_voice_client(ctx) if not vc else vc
if not vc:
return None
try:
await self._download_track(gid, track)
if not guild['current_track'] or track.id != guild['current_track']['id']:
await self._download_track(gid, track)
except yandex_music.exceptions.TimedOutError:
logging.warning(f"[VC_EXT] Timed out while downloading track '{track.title}'")
@@ -457,6 +456,7 @@ class VoiceExtension:
await self.db.set_current_track(gid, track)
if menu_message or guild['current_menu']:
# Updating menu message before playing to prevent delay and avoid FFMPEG lags.
await self.update_menu_full(ctx, guild['current_menu'], menu_message=menu_message, button_callback=button_callback)
if not guild['vibing']:
@@ -503,9 +503,11 @@ class VoiceExtension:
user = await self.users_db.get_user(uid, projection={'vibe_type': 1, 'vibe_id': 1, 'vibe_batch_id': 1, 'ym_token': 1})
vc = await self.get_voice_client(ctx) if not vc else vc
if vc:
await self.db.update(gid, {'current_track': None, 'is_stopped': True})
vc.stop()
if not vc:
return False
await self.db.update(gid, {'current_track': None, 'is_stopped': True})
vc.stop()
if full:
if not await self._full_stop(ctx, guild, gid):
@@ -580,10 +582,10 @@ class VoiceExtension:
logging.debug("[VC_EXT] Repeating current track")
next_track = guild['current_track']
elif guild['shuffle']:
logging.debug("[VC_EXT] Shuffling tracks")
logging.debug("[VC_EXT] Getting random track from queue")
next_track = await self.db.pop_random_track(gid, 'next')
else:
logging.debug("[VC_EXT] Getting next track")
logging.debug("[VC_EXT] Getting next track from queue")
next_track = await self.db.get_track(gid, 'next')
if not next_track and guild['vibing'] and not isinstance(ctx, discord.RawReactionActionEvent):
@@ -854,8 +856,7 @@ class VoiceExtension:
feedback = await client.rotor_station_feedback_track_started(
f"{user['vibe_type']}:{user['vibe_id']}",
track.id,
user['vibe_batch_id'], # type: ignore # wrong typehints
time()
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.debug(f"[VIBE] Track started feedback: {feedback}")
return True
@@ -898,8 +899,7 @@ class VoiceExtension:
f"{user['vibe_type']}:{user['vibe_id']}",
track['id'],
track['duration_ms'] // 1000,
cast(str, user['vibe_batch_id']),
time()
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.info(f"[VOICE] User {user['_id']} finished vibing with result: {res}")
return True
@@ -941,23 +941,21 @@ class VoiceExtension:
f'{user['vibe_type']}:{user['vibe_id']}',
guild['current_track']['id'],
guild['current_track']['duration_ms'] // 1000,
user['vibe_batch_id'], # type: ignore # Wrong typehints
time()
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
logging.debug(f"[VIBE] Finished track: {feedback}")
logging.debug(f"[VIBE] Finished track feeedback: {feedback}")
else:
feedback = await client.rotor_station_feedback_skip(
f'{user['vibe_type']}:{user['vibe_id']}',
guild['current_track']['id'],
guild['current_track']['duration_ms'] // 1000,
user['vibe_batch_id'], # type: ignore # Wrong typehints
time()
user['vibe_batch_id'] # type: ignore # Wrong typehints
)
if not feedback:
logging.warning("[VIBE] Failed to send vibe feedback")
return False
logging.debug(f"[VIBE] Skipped track: {feedback}")
logging.debug(f"[VIBE] Skipped track feeedback: {feedback}")
feedback = await self.update_vibe(
ctx,
user['vibe_type'],
@@ -988,18 +986,21 @@ class VoiceExtension:
Returns:
str | None: Song title or None.
"""
logging.debug("[VC_EXT] Playing next track")
client = await self.init_ym_client(ctx) if not client else client
if not client:
return None
if not vc:
vc = await self.get_voice_client(ctx)
if not await self.stop_playing(ctx, vc=vc):
return None
ym_track = cast(Track, Track.de_json(
next_track,
client=client # type: ignore # Async client can be used here.
))
await self.stop_playing(ctx, vc=vc)
return await self.play_track(
ctx,
ym_track,

View File

@@ -4,12 +4,37 @@ from typing import cast
import discord
from discord.ext.commands import Cog
from yandex_music import ClientAsync as YMClient
from yandex_music.exceptions import UnauthorizedError
from MusicBot.database import BaseUsersDatabase
from MusicBot.cogs.utils import VoiceExtension, menu_views
from MusicBot.ui import QueueView, generate_queue_embed
def setup(bot: discord.Bot):
bot.add_cog(Voice(bot))
users_db = BaseUsersDatabase()
async def get_vibe_stations_suggestions(ctx: discord.AutocompleteContext) -> list[str]:
if not ctx.interaction.user or not ctx.value or len(ctx.value) < 2:
return []
token = await users_db.get_ym_token(ctx.interaction.user.id)
if not token:
logging.info(f"[GENERAL] User {ctx.interaction.user.id} has no token")
return []
try:
client = await YMClient(token).init()
except UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.interaction.user.id} provided invalid token")
return []
stations = await client.rotor_stations_list()
return [station.station.name for station in stations if station.station and ctx.value in station.station.name][:100]
class Voice(Cog, VoiceExtension):
voice = discord.SlashCommandGroup("voice", "Команды, связанные с голосовым каналом.")
@@ -257,14 +282,17 @@ class Voice(Cog, VoiceExtension):
await ctx.respond("У вас нет прав для выполнения этой команды.", delete_after=15, ephemeral=True)
return
if (vc := await self.get_voice_client(ctx)) and await self.voice_check(ctx):
if (vc := await self.get_voice_client(ctx)) and await self.voice_check(ctx) and vc.is_connected:
res = await self.stop_playing(ctx, full=True)
if res:
await vc.disconnect(force=True)
await ctx.respond("Отключение успешно!", delete_after=15, ephemeral=True)
logging.info(f"[VOICE] Successfully disconnected from voice channel in guild {ctx.guild.id}")
return
else:
await ctx.respond("Не удалось отключиться.", delete_after=15, ephemeral=True)
else:
await ctx.respond("❌ Бот не подключен к голосовому каналу.", delete_after=15, ephemeral=True)
@queue.command(description="Очистить очередь треков и историю прослушивания.")
async def clear(self, ctx: discord.ApplicationContext) -> None:
@@ -436,13 +464,19 @@ class Voice(Cog, VoiceExtension):
if not await self.voice_check(ctx):
return
guild = await self.db.get_guild(ctx.guild.id, projection={'always_allow_menu': 1, 'current_track': 1, 'current_menu': 1})
guild = await self.db.get_guild(ctx.guild.id, projection={'always_allow_menu': 1, 'current_track': 1, 'current_menu': 1, 'vibing': 1})
channel = cast(discord.VoiceChannel, ctx.channel)
if len(channel.members) > 2 and not guild['always_allow_menu']:
logging.info(f"[VOICE] Action declined: other members are present in the voice channel")
await ctx.respond("❌ Вы не единственный в голосовом канале.", ephemeral=True)
return
if guild['vibing']:
logging.info(f"[VOICE] Action declined: vibing is already enabled in guild {ctx.guild.id}")
await ctx.respond("❌ Моя Волна уже включена. Используйте /track stop, чтобы остановить воспроизведение.", ephemeral=True)
return
if not guild['current_track']:
logging.info(f"[VOICE] No current track in {ctx.guild.id}")
await ctx.respond("❌ Нет воспроизводимого трека.", ephemeral=True)
@@ -452,35 +486,81 @@ class Voice(Cog, VoiceExtension):
if not feedback:
await ctx.respond("❌ Операция не удалась. Возможно, у вес нет подписки на Яндекс Музыку.", ephemeral=True)
return
if not guild['current_menu']:
await self.send_menu_message(ctx)
await self.send_menu_message(ctx, disable=True)
next_track = await self.db.get_track(ctx.guild_id, 'next')
if next_track:
await self._play_next_track(ctx, next_track)
@voice.command(name='vibe', description="Запустить Мою Волну.")
async def user_vibe(self, ctx: discord.ApplicationContext) -> None:
@discord.option(
"запрос",
parameter_name='name',
description="Название станции.",
type=discord.SlashCommandOptionType.string,
autocomplete=discord.utils.basic_autocomplete(get_vibe_stations_suggestions),
required=False
)
async def user_vibe(self, ctx: discord.ApplicationContext, name: str | None = None) -> None:
logging.info(f"[VOICE] Vibe (user) command invoked by user {ctx.user.id} in guild {ctx.guild_id}")
if not await self.voice_check(ctx):
return
guild = await self.db.get_guild(ctx.guild.id, projection={'always_allow_menu': 1, 'current_menu': 1})
guild = await self.db.get_guild(ctx.guild.id, projection={'always_allow_menu': 1, 'current_menu': 1, 'vibing': 1})
channel = cast(discord.VoiceChannel, ctx.channel)
if len(channel.members) > 2 and not guild['always_allow_menu']:
logging.info(f"[VOICE] Action declined: other members are present in the voice channel")
await ctx.respond("❌ Вы не единственный в голосовом канале.", ephemeral=True)
return
if guild['vibing']:
logging.info(f"[VOICE] Action declined: vibing is already enabled in guild {ctx.guild.id}")
await ctx.respond("❌ Моя Волна уже включена. Используйте /track stop, чтобы остановить воспроизведение.", ephemeral=True)
return
if name:
token = await users_db.get_ym_token(ctx.user.id)
if not token:
logging.info(f"[GENERAL] User {ctx.user.id} has no token")
return
try:
client = await YMClient(token).init()
except UnauthorizedError:
logging.info(f"[GENERAL] User {ctx.user.id} provided invalid token")
return
stations = await client.rotor_stations_list()
for content in stations:
if content.station and content.station.name == name and content.ad_params:
break
else:
content = None
if not content:
logging.debug(f"[VOICE] Station {name} not found")
await ctx.respond("❌ Станция не найдена.", ephemeral=True)
return
_type, _id = content.ad_params.other_params.split(':') if content.ad_params else (None, None)
if not _type or not _id:
logging.debug(f"[VOICE] Station {name} has no ad params")
await ctx.respond("❌ Станция не найдена.", ephemeral=True)
return
feedback = await self.update_vibe(ctx, _type, _id)
else:
feedback = await self.update_vibe(ctx, 'user', 'onyourwave')
feedback = await self.update_vibe(ctx, 'user', 'onyourwave')
if not feedback:
await ctx.respond("❌ Операция не удалась. Возможно, у вес нет подписки на Яндекс Музыку.", ephemeral=True)
return
if not guild['current_menu']:
await self.send_menu_message(ctx)
await self.send_menu_message(ctx, disable=True)
next_track = await self.db.get_track(ctx.guild_id, 'next')
if next_track:

View File

@@ -81,7 +81,7 @@ class BaseGuildsDatabase:
is_stopped=True,
allow_explicit=True,
always_allow_menu=False,
allow_connect=False,
allow_connect=True,
vote_next_track=True,
vote_add_track=True,
vote_add_album=True,

View File

@@ -17,17 +17,18 @@ cogs_list = [
@bot.event
async def on_ready():
logging.info("Bot's ready!")
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name="/voice vibe"))
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
try:
import coloredlogs
coloredlogs.install(level=logging.DEBUG)
except ImportError:
pass
if os.getenv('DEBUG') == 'True':
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('discord').setLevel(logging.INFO)
@@ -38,14 +39,14 @@ if __name__ == '__main__':
logging.getLogger('discord').setLevel(logging.WARNING)
logging.getLogger('pymongo').setLevel(logging.WARNING)
logging.getLogger('yandex_music').setLevel(logging.WARNING)
if not os.path.exists('music'):
os.mkdir('music')
token = os.getenv('TOKEN')
if not token:
raise ValueError('You must specify the bot TOKEN in your enviroment')
for cog in cogs_list:
bot.load_extension(f'MusicBot.cogs.{cog}')
bot.run(token)

View File

@@ -1,12 +1,10 @@
from .other import MyPlaylists, QueueView, generate_queue_embed, generate_playlists_embed
from .other import QueueView, generate_queue_embed
from .menu import MenuView
from .find import ListenView
__all__ = [
'MyPlaylists',
'QueueView',
'MenuView',
'ListenView',
'generate_queue_embed',
'generate_playlists_embed'
'generate_queue_embed'
]

View File

@@ -153,17 +153,33 @@ class MyVibeButton(Button, VoiceExtension):
await interaction.respond("❌ Вы не единственный в голосовом канале.", ephemeral=True)
return
track_type_map: dict[Any, Literal['track', 'album', 'artist', 'playlist', 'user']] = {
track_type_map = {
Track: 'track', Album: 'album', Artist: 'artist', Playlist: 'playlist', list: 'user'
} # NOTE: Likes playlist should have its own entry instead of 'user:onyourwave'
}
await self.send_menu_message(interaction)
if isinstance(self.item, Playlist):
if not self.item.owner:
logging.warning(f"[VIBE] Playlist owner is None")
await interaction.respond("Не удалось получить информацию о плейлисте.", ephemeral=True)
return
_id = self.item.owner.login + '_' + str(self.item.kind)
elif not isinstance(self.item, list):
_id = cast(int | str, self.item.id)
else:
_id = 'onyourwave'
await self.send_menu_message(interaction, disable=True)
await self.update_vibe(
interaction,
track_type_map[type(self.item)],
cast(int, self.item.uid) if isinstance(self.item, Playlist) else cast(int | str, self.item.id) if not isinstance(self.item, list) else 'onyourwave'
_id
)
next_track = await self.db.get_track(gid, 'next')
if next_track:
await self._play_next_track(interaction, next_track)
class ListenView(View):
def __init__(self, item: Track | Album | Artist | Playlist | list[Track], *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
@@ -195,7 +211,7 @@ class ListenView(View):
self.add_item(self.button2)
self.add_item(self.button3)
self.add_item(self.button4)
async def on_timeout(self) -> None:
try:
return await super().on_timeout()

View File

@@ -239,11 +239,11 @@ class MyVibeSelect(Select, VoiceExtension):
await interaction.edit(view=view)
class MyVibeSettingsView(View, VoiceExtension):
def __init__(self, interaction: Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
def __init__(self, interaction: Interaction, *items: Item, timeout: float | None = None, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
self.interaction = interaction
async def init(self) -> Self:
if not self.interaction.user:
logging.warning('[MENU] No user in settings view')

View File

@@ -6,19 +6,6 @@ from discord import ApplicationContext, ButtonStyle, Interaction, Embed
from MusicBot.cogs.utils.voice_extension import VoiceExtension
def generate_playlists_embed(page: int, playlists: list[tuple[str, int]]) -> Embed:
count = 15 * page
length = len(playlists)
embed = Embed(
title=f"Всего плейлистов: {length}",
color=0xfed42b
)
embed.set_author(name="Ваши плейлисты")
embed.set_footer(text=f"Страница {page + 1} из {ceil(length / 10)}")
for playlist in playlists[count:count + 10]:
embed.add_field(name=playlist[0], value=f"{playlist[1]} треков", inline=False)
return embed
def generate_queue_embed(page: int, tracks_list: list[dict[str, Any]]) -> Embed:
count = 15 * page
length = len(tracks_list)
@@ -36,61 +23,6 @@ def generate_queue_embed(page: int, tracks_list: list[dict[str, Any]]) -> Embed:
embed.add_field(name=f"{i} - {track['title']} - {duration_m}:{duration_s:02d}", value="", inline=False)
return embed
class MPNextButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user:
return
user = await self.users_db.get_user(interaction.user.id)
page = user['playlists_page'] + 1
await self.users_db.update(interaction.user.id, {'playlists_page': page})
embed = generate_playlists_embed(page, user['playlists'])
await interaction.edit(embed=embed, view=await MyPlaylists(interaction).init())
class MPPrevButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self, None)
async def callback(self, interaction: Interaction) -> None:
if not interaction.user:
return
user = await self.users_db.get_user(interaction.user.id)
page = user['playlists_page'] - 1
await self.users_db.update(interaction.user.id, {'playlists_page': page})
embed = generate_playlists_embed(page, user['playlists'])
await interaction.edit(embed=embed, view=await MyPlaylists(interaction).init())
class MyPlaylists(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self, None)
self.ctx = ctx
self.next_button = MPNextButton(style=ButtonStyle.primary, emoji='▶️')
self.prev_button = MPPrevButton(style=ButtonStyle.primary, emoji='◀️')
async def init(self) -> Self:
if not self.ctx.user:
return self
user = await self.users_db.get_user(self.ctx.user.id)
count = 10 * user['playlists_page']
if not user['playlists'][count + 10:]:
self.next_button.disabled = True
if not user['playlists'][:count]:
self.prev_button.disabled = True
self.add_item(self.prev_button)
self.add_item(self.next_button)
return self
class QueueNextButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)