impr: Add player repeat and shuffle, play artist and playlist. Code optimization.

This commit is contained in:
Lemon4ksan
2025-01-12 22:14:31 +03:00
parent a108799e63
commit b79e16fddf
9 changed files with 505 additions and 253 deletions

View File

@@ -9,8 +9,8 @@ from yandex_music import ClientAsync as YMClient
from MusicBot.database import BaseUsersDatabase
from MusicBot.cogs.utils.find import (
proccess_album, process_track, process_artist,
ListenAlbum, ListenTrack, ListenArtist
process_album, process_track, process_artist, process_playlist,
ListenAlbum, ListenTrack, ListenArtist, ListenPlaylist
)
def setup(bot):
@@ -22,7 +22,7 @@ class General(Cog):
self.bot = bot
self.db = BaseUsersDatabase()
@discord.slash_command(description="Login to Yandex Music using access token.", guild_ids=[1247100229535141899])
@discord.slash_command(description="Войти в Yandex Music с помощью токена.")
@discord.option("token", type=discord.SlashCommandOptionType.string)
async def login(self, ctx: discord.ApplicationContext, token: str) -> None:
try:
@@ -36,45 +36,55 @@ class General(Cog):
self.db.update(uid, {'ym_token': token})
await ctx.respond(f'Привет, {about['account']['first_name']}!', delete_after=15, ephemeral=True)
@discord.slash_command(description="Find the content type by its name and send info about it. The best match is returned.", guild_ids=[1247100229535141899])
@discord.slash_command(description="Найти контент и отправить информацию о нём. Возвращается лучшее совпадение.")
@discord.option(
"name",
description="Name of the content to find",
description="Название контента для поиска",
type=discord.SlashCommandOptionType.string
)
@discord.option(
"content_type",
description="Type of the conent to find (artist, album, track, playlist).",
description="Тип искомого контента (artist, album, track, playlist).",
type=discord.SlashCommandOptionType.string,
default='track'
)
async def find(self, ctx: discord.ApplicationContext, name: str, content_type: str = 'track') -> None:
if content_type not in ('artist', 'album', 'track', 'playlist'):
await ctx.respond('❌ Недопустимый тип.')
await ctx.respond("❌ Недопустимый тип.", delete_after=15, ephemeral=True)
return
token = self.db.get_ym_token(ctx.user.id)
if not token:
await ctx.respond('❌ Необходимо указать свой токен доступа с помощью комманды /login.', delete_after=15, ephemeral=True)
await ctx.respond("❌ Необходимо указать свой токен доступа с помощью комманды /login.", delete_after=15, ephemeral=True)
return
try:
client = await YMClient(token).init()
except yandex_music.exceptions.UnauthorizedError:
await ctx.respond('❌ Недействительный токен. Если это не так, попробуйте ещё раз.', delete_after=15, ephemeral=True)
await ctx.respond("❌ Недействительный токен. Если это не так, попробуйте ещё раз.", delete_after=15, ephemeral=True)
return
result = await client.search(name, True, content_type)
if not result:
await ctx.respond("❌ Что-то пошло не так. Повторите попытку позже", delete_after=15, ephemeral=True)
return
if content_type == 'album':
album = result.albums.results[0] # type: ignore
embed = await proccess_album(album)
await ctx.respond("", embed=embed, view=ListenAlbum(album), delete_after=360)
elif content_type == 'track':
track: yandex_music.Track = result.tracks.results[0] # type: ignore
if content_type == 'album' and result.albums:
album = result.albums.results[0]
embed = await process_album(album)
await ctx.respond(embed=embed, view=ListenAlbum(album))
elif content_type == 'track' and result.tracks:
track: yandex_music.Track = result.tracks.results[0]
album_id = cast(int, track.albums[0].id)
embed = await process_track(track)
await ctx.respond("", embed=embed, view=ListenTrack(track, album_id), delete_after=360)
elif content_type == 'artist':
artist = result.artists.results[0] # type: ignore
await ctx.respond(embed=embed, view=ListenTrack(track, album_id))
elif content_type == 'artist' and result.artists:
artist = result.artists.results[0]
embed = await process_artist(artist)
await ctx.respond("", embed=embed, view=ListenArtist(artist.id), delete_after=360)
await ctx.respond(embed=embed, view=ListenArtist(artist))
elif content_type == 'playlist' and result.playlists:
playlist = result.playlists.results[0]
embed = await process_playlist(playlist)
await ctx.respond(embed=embed, view=ListenPlaylist(playlist))
else:
await ctx.respond("❌ По запросу ничего не найдено.", delete_after=15, ephemeral=True)

View File

@@ -1,11 +1,12 @@
from os import getenv
from math import ceil
from typing import cast
import discord
from yandex_music import Track, Album, Artist, Label
from yandex_music import Track, Album, Artist, Playlist, Label
from discord.ui import View, Button, Item
from discord import ButtonStyle, Interaction
from discord import ButtonStyle, Interaction, Embed
from MusicBot.cogs.utils.voice import VoiceExtension, get_average_color_from_url
@@ -19,20 +20,22 @@ class PlayTrackButton(Button, VoiceExtension):
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild or not await self.voice_check(interaction):
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
if guild['current_track'] is not None:
if guild['current_track']:
self.db.modify_track(gid, self.track, 'next', 'append')
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
await interaction.respond(f"Трек **{self.track.title}** был добавлен в очередь.", delete_after=15)
response_message = f"Трек **{self.track.title}** был добавлен в очередь."
else:
title = await self.play_track(interaction, self.track)
if title:
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
await interaction.respond(f"Сейчас играет: **{title}**!", delete_after=15)
await self.play_track(interaction, self.track)
response_message = f"Сейчас играет: **{self.track.title}**!"
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
await interaction.respond(response_message, delete_after=15)
class PlayAlbumButton(Button, VoiceExtension):
@@ -44,152 +47,149 @@ class PlayAlbumButton(Button, VoiceExtension):
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild or not await self.voice_check(interaction):
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
album = await self.album.with_tracks_async()
if not album or not album.volumes:
return
tracks: list[Track] = []
for volume in album.volumes:
tracks.extend(volume)
gid = interaction.guild.id
guild = self.db.get_guild(gid)
tracks: list[Track] = [track for volume in album.volumes for track in volume]
if guild['current_track'] is not None:
self.db.modify_track(gid, tracks, 'next', 'extend')
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
else:
await interaction.respond(f"Альбом **{album.title}** был добавлен в очередь.", delete_after=15)
response_message = f"Альбом **{album.title}** был добавлен в очередь."
else:
track = tracks.pop(0)
self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
response_message = f"Сейчас играет: **{album.title}**!"
title = await self.play_track(interaction, track)
if title:
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
else:
await interaction.respond(f"Сейчас играет: **{album.title}**!", delete_after=15)
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
else:
await interaction.respond(response_message, delete_after=15)
class PlayArtistButton(Button, VoiceExtension):
def __init__(self, artist: Artist, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self)
self.artist = artist
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild or not await self.voice_check(interaction):
return
artist_tracks = await self.artist.get_tracks_async(page_size=500)
if not artist_tracks:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
tracks: list[Track] = artist_tracks.tracks
if guild['current_track'] is not None:
self.db.modify_track(gid, tracks, 'next', 'extend')
response_message = f"Песни артиста **{self.artist.name}** были добавлены в очередь."
else:
track = tracks.pop(0)
self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
response_message = f"Сейчас играет: **{self.artist.name}**!"
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
await interaction.respond(response_message, delete_after=15)
class PlayPlaylistButton(Button, VoiceExtension):
def __init__(self, playlist: Playlist, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self)
self.playlist = playlist
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild or not await self.voice_check(interaction):
return
short_tracks = await self.playlist.fetch_tracks_async()
if not short_tracks:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
tracks: list[Track] = [cast(Track, short_track.track) for short_track in short_tracks]
if guild['current_track'] is not None:
self.db.modify_track(gid, tracks, 'next', 'extend')
response_message = f"Плейлист **{self.playlist.title}** был добавлен в очередь."
else:
track = tracks.pop(0)
self.db.modify_track(gid, tracks, 'next', 'extend')
await self.play_track(interaction, track)
response_message = f"Сейчас играет: **{self.playlist.title}**!"
if guild['current_player'] is not None and interaction.message:
await interaction.message.delete()
await interaction.respond(response_message, delete_after=15)
class ListenTrack(View):
def __init__(self, track: Track, album_id: int, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
def __init__(self, track: Track, album_id: int, *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
link_app = f"yandexmusic://album/{album_id}/track/{track.id}"
link_web = f"https://music.yandex.ru/album/{album_id}/track/{track.id}"
self.button1 = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2 = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3 = PlayTrackButton(track, label="Слушать в голосовом канале", style=ButtonStyle.gray)
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayTrackButton = PlayTrackButton(track, label="Слушать в голосовом канале", style=ButtonStyle.gray)
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
self.add_item(self.button3)
class ListenAlbum(View):
def __init__(self, album: Album, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
def __init__(self, album: Album, *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
link_app = f"yandexmusic://album/{album.id}"
link_web = f"https://music.yandex.ru/album/{album.id}"
self.button1 = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2 = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3 = PlayAlbumButton(album, label="Слушать в голосовом канале", style=ButtonStyle.gray)
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayAlbumButton = PlayAlbumButton(album, label="Слушать в голосовом канале", style=ButtonStyle.gray)
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
self.add_item(self.button3)
class ListenArtist(View):
def __init__(self, artist_id: int, *items: Item, timeout: float | None = 360, disable_on_timeout: bool = True):
def __init__(self, artist: Artist, *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
link_app = f"yandexmusic://artist/{artist_id}"
link_web = f"https://music.yandex.ru/artist/{artist_id}"
self.button1 = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2 = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
link_app = f"yandexmusic://artist/{artist.id}"
link_web = f"https://music.yandex.ru/artist/{artist.id}"
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayArtistButton = PlayArtistButton(artist, label="Слушать в голосовом канале", style=ButtonStyle.gray)
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
self.add_item(self.button3)
class ListenPlaylist(View):
def __init__(self, playlist: Playlist, *items: Item, timeout: float | None = None, disable_on_timeout: bool = False):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
link_app = f"yandexmusic://playlist/{playlist.playlist_uuid}"
link_web = f"https://music.yandex.ru/playlist/{playlist.playlist_uuid}"
self.button1: Button = Button(label="Слушать в приложении", style=ButtonStyle.gray, url=link_app)
self.button2: Button = Button(label="Слушать в браузере", style=ButtonStyle.gray, url=link_web)
self.button3: PlayPlaylistButton = PlayPlaylistButton(playlist, label="Слушать в голосовом канале", style=ButtonStyle.gray)
# self.add_item(self.button1) # Discord doesn't allow well formed URLs in buttons for some reason.
self.add_item(self.button2)
self.add_item(self.button3)
async def proccess_album(album: Album) -> discord.Embed:
"""Generate album embed.
Args:
album (yandex_music.Album): Album to process.
Returns:
discord.Embed: Album embed.
"""
title = cast(str, album.title)
track_count = album.track_count
artists = album.artists_name()
avail = cast(bool, album.available)
description = album.short_description
year = album.year
version = album.version
bests = album.bests
duration = album.duration_ms
explicit = album.explicit or album.content_warning
likes_count = album.likes_count
artist = album.artists[0]
cover_url = album.get_cover_url('400x400')
color = await get_average_color_from_url(cover_url)
if isinstance(album.labels[0], Label):
labels = [cast(Label, label).name for label in album.labels]
else:
labels = [cast(str, label) for label in album.labels]
if version:
title += f' *{version}*'
if explicit:
title += ' <:explicit:1325879701117472869>'
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover:
artist_cover_url = artist.get_op_image_url()
else:
artist_cover_url = artist_cover.get_url()
embed = discord.Embed(
title=title,
description=description,
color=color,
)
embed.set_thumbnail(url=cover_url)
embed.set_author(name=", ".join(artists), url=artist_url, icon_url=artist_cover_url)
if year:
embed.add_field(name="Год выпуска", value=str(year))
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if track_count is not None:
if track_count > 1:
embed.add_field(name="Треки", value=str(track_count))
else:
embed.add_field(name="Треки", value="Сингл")
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if len(labels) > 1:
embed.add_field(name="Лейблы", value=", ".join(labels))
else:
embed.add_field(name="Лейбл", value=", ".join(labels))
if not avail:
embed.set_footer(text=f"Трек в данный момент недоступен.")
return embed
async def process_track(track: Track) -> discord.Embed:
async def process_track(track: Track) -> Embed:
"""Generate track embed.
Args:
@@ -215,7 +215,10 @@ async def process_track(track: Track) -> discord.Embed:
color = await get_average_color_from_url(cover_url)
if explicit:
title += ' <:explicit:1325879701117472869>'
explicit_eid = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment (EXPLICIT_EID).')
title += ' <:explicit:' + explicit_eid + '>'
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
@@ -262,7 +265,89 @@ async def process_track(track: Track) -> discord.Embed:
return embed
async def process_artist(artist: Artist) -> discord.Embed:
async def process_album(album: Album) -> Embed:
"""Generate album embed.
Args:
album (yandex_music.Album): Album to process.
Returns:
discord.Embed: Album embed.
"""
title = cast(str, album.title)
track_count = album.track_count
artists = album.artists_name()
avail = cast(bool, album.available)
description = album.short_description
year = album.year
version = album.version
bests = album.bests
duration = album.duration_ms
explicit = album.explicit or album.content_warning
likes_count = album.likes_count
artist = album.artists[0]
cover_url = album.get_cover_url('400x400')
color = await get_average_color_from_url(cover_url)
if isinstance(album.labels[0], Label):
labels = [cast(Label, label).name for label in album.labels]
else:
labels = [cast(str, label) for label in album.labels]
if version:
title += f' *{version}*'
if explicit:
explicit_eid = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment.')
title += ' <:explicit:' + explicit_eid + '>'
artist_url = f"https://music.yandex.ru/artist/{artist.id}"
artist_cover = artist.cover
if not artist_cover:
artist_cover_url = artist.get_op_image_url()
else:
artist_cover_url = artist_cover.get_url()
embed = discord.Embed(
title=title,
description=description,
color=color,
)
embed.set_thumbnail(url=cover_url)
embed.set_author(name=", ".join(artists), url=artist_url, icon_url=artist_cover_url)
if year:
embed.add_field(name="Год выпуска", value=str(year))
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if track_count is not None:
if track_count > 1:
embed.add_field(name="Треки", value=str(track_count))
else:
embed.add_field(name="Треки", value="Сингл")
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if len(labels) > 1:
embed.add_field(name="Лейблы", value=", ".join(labels))
else:
embed.add_field(name="Лейбл", value=", ".join(labels))
if not avail:
embed.set_footer(text=f"Альбом в данный момент недоступен.")
return embed
async def process_artist(artist: Artist) -> Embed:
"""Generate artist embed.
Args:
@@ -314,4 +399,55 @@ async def process_artist(artist: Artist) -> discord.Embed:
if not avail:
embed.set_footer(text=f"Артист в данный момент недоступен.")
return embed
async def process_playlist(playlist: Playlist) -> Embed:
"""Generate playlist embed.
Args:
playlist (yandex_music.Playlist): Playlist to process.
Returns:
discord.Embed: Playlist embed.
"""
title = cast(str, playlist.title)
track_count = playlist.track_count
avail = cast(bool, playlist.available)
description = playlist.description_formatted
year = playlist.created
modified = playlist.modified
duration = playlist.duration_ms
likes_count = playlist.likes_count
cover_url = f"https://{playlist.cover.uri.replace('%%', '400x400')}" # type: ignore
color = await get_average_color_from_url(cover_url)
embed = discord.Embed(
title=title,
description=description,
color=color,
)
embed.set_thumbnail(url=cover_url)
if year:
embed.add_field(name="Год создания", value=str(year).split('-')[0])
if modified:
embed.add_field(name="Изменён", value=str(modified).split('-')[0])
if duration:
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
embed.add_field(name="Длительность", value=f"{duration_m}:{duration_s:02}")
if track_count is not None:
embed.add_field(name="Треки", value=str(track_count))
if likes_count:
embed.add_field(name="Лайки", value=str(likes_count))
if not avail:
embed.set_footer(text=f"Плейлист в данный момент недоступен.")
return embed

View File

@@ -1,8 +1,34 @@
from discord.ui import View, Button, Item
from discord import InteractionMessage, ButtonStyle, Interaction, ApplicationContext
from discord import ButtonStyle, Interaction, ApplicationContext
from MusicBot.cogs.utils.voice import VoiceExtension
class ToggleRepeatButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self)
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
self.db.update(gid, {'repeat': not guild['repeat']})
await interaction.edit(view=Player(interaction))
class ToggleShuffleButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
VoiceExtension.__init__(self)
async def callback(self, interaction: Interaction) -> None:
if not interaction.guild:
return
gid = interaction.guild.id
guild = self.db.get_guild(gid)
self.db.update(gid, {'shuffle': not guild['shuffle']})
await interaction.edit(view=Player(interaction))
class PlayPauseButton(Button, VoiceExtension):
def __init__(self, **kwargs):
Button.__init__(self, **kwargs)
@@ -11,24 +37,21 @@ class PlayPauseButton(Button, VoiceExtension):
async def callback(self, interaction: Interaction) -> None:
if not await self.voice_check(interaction):
return
vc = self.get_voice_client(interaction)
if vc is not None:
if not vc.is_paused():
vc.pause()
message = interaction.message
if not message:
return
embed = message.embeds[0]
embed.set_footer(text='Приостановлено')
await interaction.edit(embed=embed)
else:
vc.resume()
message = interaction.message
if not message:
return
embed = message.embeds[0]
embed.remove_footer()
await interaction.edit(embed=embed)
if not vc or not interaction.message:
return
embed = interaction.message.embeds[0]
if vc.is_paused():
vc.resume()
embed.remove_footer()
else:
vc.pause()
embed.set_footer(text='Приостановлено')
await interaction.edit(embed=embed)
class NextTrackButton(Button, VoiceExtension):
def __init__(self, **kwargs):
@@ -54,24 +77,41 @@ class PrevTrackButton(Button, VoiceExtension):
if not title:
await interaction.respond(f"Нет треков в истории.", delete_after=15, ephemeral=True)
class Player(View):
class Player(View, VoiceExtension):
def __init__(self, ctx: ApplicationContext, *items: Item, timeout: float | None = 3600, disable_on_timeout: bool = True):
super().__init__(*items, timeout=timeout, disable_on_timeout=disable_on_timeout)
def __init__(self, ctx: ApplicationContext | Interaction, *items: Item, timeout: float | None = 3600, disable_on_timeout: bool = True):
View.__init__(self, *items, timeout=timeout, disable_on_timeout=disable_on_timeout)
VoiceExtension.__init__(self)
if not ctx.guild:
return
guild = self.db.get_guild(ctx.guild.id)
self.ctx = ctx
self.repeat_button = Button(style=ButtonStyle.secondary, emoji='🔂', row=0)
self.shuffle_button = Button(style=ButtonStyle.secondary, emoji='🔀', row=0)
self.repeat_button_off = ToggleRepeatButton(style=ButtonStyle.secondary, emoji='🔂', row=0)
self.repeat_button_on = ToggleRepeatButton(style=ButtonStyle.success, emoji='🔂', row=0)
self.shuffle_button_off = ToggleShuffleButton(style=ButtonStyle.secondary, emoji='🔀', row=0)
self.shuffle_button_on = ToggleShuffleButton(style=ButtonStyle.success, emoji='🔀', row=0)
self.play_pause_button = PlayPauseButton(style=ButtonStyle.primary, emoji='', row=0)
self.next_button = NextTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.prev_button = PrevTrackButton(style=ButtonStyle.primary, emoji='', row=0)
self.queue_button = Button(style=ButtonStyle.primary, emoji='📋', row=1)
self.add_item(self.repeat_button)
if guild['repeat']:
self.add_item(self.repeat_button_on)
else:
self.add_item(self.repeat_button_off)
self.add_item(self.prev_button)
self.add_item(self.play_pause_button)
self.add_item(self.next_button)
self.add_item(self.shuffle_button)
if guild['shuffle']:
self.add_item(self.shuffle_button_on)
else:
self.add_item(self.shuffle_button_off)

View File

@@ -1,5 +1,6 @@
import aiohttp
import asyncio
from os import getenv
from math import ceil
from typing import cast
from io import BytesIO
@@ -39,7 +40,10 @@ async def generate_player_embed(track: Track) -> discord.Embed:
color = await get_average_color_from_url(cover_url)
if explicit:
title += ' <:explicit:1325879701117472869>'
explicit_eid = getenv('EXPLICIT_EID')
if not explicit_eid:
raise ValueError('You must specify explicit emoji id in your enviroment.')
title += ' <:explicit:' + explicit_eid + '>'
duration_m = duration // 60000
duration_s = ceil(duration / 1000) - duration_m * 60
@@ -99,10 +103,10 @@ async def get_average_color_from_url(url: str) -> int:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
response = await response.read()
result = await response.read()
img = Image.open(BytesIO(response))
img = img.convert('RGB')
img_file = Image.open(BytesIO(result))
img = img_file.convert('RGB')
width, height = img.size
r_total, g_total, b_total = 0, 0, 0
@@ -157,7 +161,7 @@ class VoiceExtension:
# If interaction from player buttons
await ctx.edit(embed=embed)
else:
# If interaction from other buttons. They should have thair own response.
# If interaction from other buttons. They should have their own response.
await player.edit(embed=embed)
async def voice_check(self, ctx: ApplicationContext | Interaction) -> bool:
@@ -174,7 +178,7 @@ class VoiceExtension:
token = self.users_db.get_ym_token(ctx.user.id)
if not token:
await ctx.respond('❌ Необходимо указать свой токен доступа с помощью комманды /login.', delete_after=15, ephemeral=True)
await ctx.respond("❌ Необходимо указать свой токен доступа с помощью комманды /login.", delete_after=15, ephemeral=True)
return False
channel = ctx.channel
@@ -200,7 +204,7 @@ class VoiceExtension:
ctx (ApplicationContext | Interaction): Command context.
Returns:
discord.VoiceClient | None: Voice client.
discord.VoiceClient | None: Voice client or None.
"""
if isinstance(ctx, Interaction):
@@ -211,8 +215,8 @@ class VoiceExtension:
return cast(discord.VoiceClient, voice_chat)
async def play_track(self, ctx: ApplicationContext | Interaction, track: Track) -> str | None:
"""Download ``track`` by its id and play it in the voice channel. Return track title on success and don't respond.
If sound is already playing, add track id to the queue and respond.
"""Download ``track`` by its id and play it in the voice channel. Return track title on success.
If sound is already playing, add track id to the queue. There's no response to the context.
Args:
ctx (ApplicationContext | Interaction): Context
@@ -222,11 +226,11 @@ class VoiceExtension:
str | None: Song title or None.
"""
if not ctx.guild:
return
return None
vc = self.get_voice_client(ctx)
if not vc:
return
return None
if isinstance(ctx, Interaction):
loop = ctx.client.loop
@@ -253,11 +257,13 @@ class VoiceExtension:
vc = self.get_voice_client(ctx)
if vc:
vc.pause()
return
def resume_playing(self, ctx: ApplicationContext | Interaction) -> None:
vc = self.get_voice_client(ctx)
if vc:
vc.resume()
return
def stop_playing(self, ctx: ApplicationContext | Interaction) -> None:
if not ctx.guild:
@@ -267,10 +273,11 @@ class VoiceExtension:
if vc:
self.db.update(ctx.guild.id, {'current_track': None, 'is_stopped': True})
vc.stop()
return
async def next_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Switch to the next track in the queue. Return track title on success.
Stop playing if tracks list is empty.
Doesn't change track if stopped. Stop playing if tracks list is empty.
Args:
ctx (ApplicationContext | Interaction): Context
@@ -279,31 +286,36 @@ class VoiceExtension:
str | None: Track title or None.
"""
if not ctx.guild or not ctx.user:
return
return None
gid = ctx.guild.id
guild = self.db.get_guild(gid)
token = self.users_db.get_ym_token(ctx.user.id)
if guild.get('is_stopped'):
return
if guild['is_stopped']:
return None
if not self.get_voice_client(ctx): # Silently return if bot got kicked
return
return None
current_track = guild['current_track']
next_track = self.db.get_track(gid, 'next')
if next_track and current_track:
ym_track = None
if guild['repeat'] and current_track:
return await self.repeat_current_track(ctx)
elif guild['shuffle']:
next_track = self.db.get_random_track(gid)
else:
next_track = self.db.get_track(gid, 'next')
if current_track:
self.db.modify_track(gid, current_track, 'previous', 'insert')
if next_track:
ym_track = Track.de_json(next_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
elif next_track:
ym_track = Track.de_json(next_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
elif current_track:
self.db.modify_track(gid, current_track, 'previous', 'insert')
self.stop_playing(ctx)
return None
async def prev_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Switch to the previous track in the queue. Repeat curren the song if no previous tracks.
@@ -317,21 +329,22 @@ class VoiceExtension:
"""
if not ctx.guild or not ctx.user:
return
return None
gid = ctx.guild.id
token = self.users_db.get_ym_token(ctx.user.id)
current_track = self.db.get_track(gid, 'current')
prev_track = self.db.get_track(gid, 'previous')
title = None
if prev_track:
if current_track:
self.db.modify_track(gid, current_track, 'next', 'insert')
ym_track = Track.de_json(prev_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
title = await self.play_track(ctx, ym_track) # type: ignore
elif current_track:
return await self.repeat_current_track(ctx)
title = await self.repeat_current_track(ctx)
return title
async def repeat_current_track(self, ctx: ApplicationContext | Interaction) -> str | None:
"""Repeat current track. Return track title on success.
@@ -344,7 +357,7 @@ class VoiceExtension:
"""
if not ctx.guild or not ctx.user:
return
return None
gid = ctx.guild.id
token = self.users_db.get_ym_token(ctx.user.id)
@@ -354,3 +367,5 @@ class VoiceExtension:
ym_track = Track.de_json(current_track, client=ClientAsync(token)) # type: ignore
self.stop_playing(ctx)
return await self.play_track(ctx, ym_track) # type: ignore
return None

View File

@@ -14,72 +14,79 @@ def setup(bot: discord.Bot):
class Voice(Cog, VoiceExtension):
voice = discord.SlashCommandGroup("voice", "Команды, связанные с голосовым каналом.", [1247100229535141899])
queue = discord.SlashCommandGroup("queue", "Команды, связанные с очередью треков.", [1247100229535141899])
track = discord.SlashCommandGroup("track", "Команды, связанные с текущим треком.", [1247100229535141899])
voice = discord.SlashCommandGroup("voice", "Команды, связанные с голосовым каналом.")
queue = discord.SlashCommandGroup("queue", "Команды, связанные с очередью треков.")
track = discord.SlashCommandGroup("track", "Команды, связанные с текущим треком.")
@voice.command(name="menu", description="Toggle player menu. Available only if you're the only one in the vocie channel.")
@voice.command(name="menu", description="Переключить меню проигрывателя. Доступно только если вы единственный в голосовом канале.")
async def menu(self, ctx: discord.ApplicationContext) -> None:
if not await self.voice_check(ctx):
return
current_track = self.db.get_track(ctx.guild.id, 'current')
try:
embed = await process_track(Track.de_json(current_track, client=ClientAsync())) # type: ignore
guild = self.db.get_guild(ctx.guild.id)
embed = None
if guild['current_track']:
embed = await process_track(Track.de_json(guild['current_track'], client=ClientAsync())) # type: ignore
vc = self.get_voice_client(ctx)
if not vc:
return
if not vc.is_paused():
if vc and vc.is_paused():
embed.set_footer(text='Приостановлено')
else:
embed.remove_footer()
except AttributeError:
embed = None
if guild['current_player']:
message = await ctx.fetch_message(guild['current_player'])
await message.delete()
interaction = cast(discord.Interaction, await ctx.respond(view=Player(ctx), embed=embed, delete_after=3600))
response = await interaction.original_response()
self.db.update(ctx.guild.id, {'current_player': response.id})
@voice.command(name="join", description="Join the voice channel you're currently in.")
@voice.command(name="join", description="Подключиться к голосовому каналу, в котором вы сейчас находитесь.")
async def join(self, ctx: discord.ApplicationContext) -> None:
vc = self.get_voice_client(ctx)
if vc is not None and vc.is_playing():
await ctx.respond("❌ Бот уже находится в голосовом канале. Выключите его с помощью команды /voice leave.", delete_after=15, ephemeral=True)
elif ctx.channel is not None and isinstance(ctx.channel, discord.VoiceChannel):
if vc and vc.is_playing():
response_message = "❌ Бот уже находится в голосовом канале. Выключите его с помощью команды /voice leave."
elif isinstance(ctx.channel, discord.VoiceChannel):
await ctx.channel.connect(timeout=15)
await ctx.respond("Подключение успешно!", delete_after=15, ephemeral=True)
response_message = "Подключение успешно!"
else:
await ctx.respond("❌ Вы должны отправить команду в голосовом канале.", delete_after=15, ephemeral=True)
response_message = "❌ Вы должны отправить команду в голосовом канале."
await ctx.respond(response_message, delete_after=15, ephemeral=True)
@voice.command(description="Force the bot to leave the voice channel.")
@voice.command(description="Заставить бота покинуть голосовой канал.")
async def leave(self, ctx: discord.ApplicationContext) -> None:
vc = self.get_voice_client(ctx)
if await self.voice_check(ctx) and vc is not None:
if vc and await self.voice_check(ctx):
self.stop_playing(ctx)
self.db.clear_history(ctx.guild.id)
await vc.disconnect(force=True)
await ctx.respond("Отключение успешно!", delete_after=15, ephemeral=True)
@queue.command(description="Clear tracks queue and history.")
@queue.command(description="Очистить очередь и историю треков.")
async def clear(self, ctx: discord.ApplicationContext) -> None:
if not await self.voice_check(ctx):
return
self.db.clear_history(ctx.guild.id)
await ctx.respond("Очередь и история сброшены.", delete_after=15, ephemeral=True)
@queue.command(description="Get tracks queue.")
@queue.command(description="Получить очередь треков.")
async def get(self, ctx: discord.ApplicationContext) -> None:
if await self.voice_check(ctx):
tracks_list = self.db.get_tracks_list(ctx.guild.id, 'next')
embed = discord.Embed(
title='Список треков',
color=discord.Color.dark_purple()
)
for i, track in enumerate(tracks_list, start=1):
embed.add_field(name=f"{i} - {track.get('title')}", value="", inline=False)
if i == 25:
break
await ctx.respond("", embed=embed, ephemeral=True)
if not await self.voice_check(ctx):
return
tracks_list = self.db.get_tracks_list(ctx.guild.id, 'next')
embed = discord.Embed(
title='Список треков',
color=discord.Color.dark_purple()
)
for i, track in enumerate(tracks_list, start=1):
embed.add_field(name=f"{i} - {track.get('title')}", value="", inline=False)
if i == 25:
break
await ctx.respond("", embed=embed, ephemeral=True)
@track.command(description="Pause the current track.")
@track.command(description="Приостановить текущий трек.")
async def pause(self, ctx: discord.ApplicationContext) -> None:
vc = self.get_voice_client(ctx)
if await self.voice_check(ctx) and vc is not None:
@@ -89,7 +96,7 @@ class Voice(Cog, VoiceExtension):
else:
await ctx.respond("Воспроизведение уже приостановлено.", delete_after=15, ephemeral=True)
@track.command(description="Resume the current track.")
@track.command(description="Возобновить текущий трек.")
async def resume(self, ctx: discord.ApplicationContext) -> None:
vc = self.get_voice_client(ctx)
if await self.voice_check(ctx) and vc is not None:
@@ -99,14 +106,19 @@ class Voice(Cog, VoiceExtension):
else:
await ctx.respond("Воспроизведение не на паузе.", delete_after=15, ephemeral=True)
@track.command(description="Stop the current track and clear the queue and history.")
@track.command(description="Остановить текущий трек и очистите очередь и историю.")
async def stop(self, ctx: discord.ApplicationContext) -> None:
if await self.voice_check(ctx):
self.db.clear_history(ctx.guild.id)
self.stop_playing(ctx)
current_player = self.db.get_guild(ctx.guild.id)['current_player']
if current_player is not None:
message = await ctx.fetch_message(current_player)
await message.delete()
self.db.update(ctx.guild.id, {'current_player': None, 'repeat': False, 'shuffle': False})
await ctx.respond("Воспроизведение остановлено.", delete_after=15, ephemeral=True)
@track.command(description="Switch to the next song in the queue.")
@track.command(description="Переключиться на следующую песню в очереди.")
async def next(self, ctx: discord.ApplicationContext) -> None:
if await self.voice_check(ctx):
gid = ctx.guild.id

View File

@@ -74,7 +74,9 @@ class BaseGuildsDatabase:
current_player=None,
is_stopped=True,
allow_explicit=True,
allow_menu=True
allow_menu=True,
shuffle=False,
repeat=False
))
def update(self, gid: int, data: Guild) -> None:

View File

@@ -1,4 +1,5 @@
from typing import Any, Literal, cast
from random import randint
from typing import Any, Literal
from yandex_music import Track
from MusicBot.database import BaseGuildsDatabase
@@ -47,14 +48,17 @@ class VoiceGuildsDatabase(BaseGuildsDatabase):
elif type == 'next':
tracks = guild['next_tracks']
if not tracks:
return
return None
track = tracks.pop(0)
self.update(gid, {'next_tracks': tracks})
elif type == 'previous':
tracks = guild['previous_tracks']
if not tracks:
return
return None
track = tracks.pop(0)
current_track = guild['current_track']
if current_track:
self.modify_track(gid, current_track, 'next', 'insert')
self.update(gid, {'previous_tracks': tracks})
return track
@@ -63,7 +67,7 @@ class VoiceGuildsDatabase(BaseGuildsDatabase):
self, gid: int,
track: Track | dict[str, Any] | list[dict[str, Any]] | list[Track],
type: Literal['next', 'previous'],
operation: Literal['insert', 'append', 'extend', 'pop_start', 'pop_end']
operation: Literal['insert', 'append', 'extend', 'pop_start', 'pop_end', 'pop_random']
) -> dict[str, Any] | None:
"""Perform operation of given type on tracks list of given type.
@@ -77,10 +81,16 @@ class VoiceGuildsDatabase(BaseGuildsDatabase):
dict[str, Any] | None: Dictionary convertable to yandex_music.Track or None.
"""
guild = self.get_guild(gid)
explicit_type: Literal['next_tracks', 'previous_tracks'] = type + '_tracks'
if type not in ('next_tracks', 'previous_tracks'):
raise ValueError(f"Type must be either 'next' or 'previous', not '{type}'")
explicit_type: Literal['next_tracks', 'previous_tracks'] = type + '_tracks' # type: ignore[assignment]
tracks = guild[explicit_type]
pop_track = None
if not tracks:
return None
if isinstance(track, list):
tracks_list = []
for _track in track:
@@ -106,15 +116,34 @@ class VoiceGuildsDatabase(BaseGuildsDatabase):
elif operation == 'pop_start':
pop_track = tracks.pop(0)
elif operation == 'pop_end':
pop_track = tracks.pop(0)
pop_track = tracks.pop(-1)
elif operation == 'pop_random':
pop_track = tracks.pop(randint(0, len(tracks)))
elif operation == 'extend':
raise ValueError('Can only use extend operation on lists.')
else:
raise ValueError(f"Unknown operation '{operation}'")
self.update(gid, {explicit_type: tracks}) # type: ignore
if pop_track:
return pop_track
return pop_track
def get_random_track(self, gid: int) -> dict[str, Any] | None:
"""Pop random track from the queue.
Args:
gid (int): Guild id.
Returns:
dict[str, Any] | None: Dictionary covertable to yandex_musci.Track or None
"""
tracks = self.get_tracks_list(gid, 'next')
if not tracks:
return None
track = tracks.pop()
self.update(gid, {'next_tracks': tracks})
return track
def set_current_track(self, gid: int, track: Track | dict[str, Any]) -> None:
if isinstance(track, Track):
track = track.to_dict()

View File

@@ -8,6 +8,8 @@ class Guild(TypedDict, total=False):
is_stopped: bool
allow_explicit: bool
allow_menu: bool
shuffle: bool
repeat: bool
class ExplicitGuild(TypedDict):
_id: int
@@ -17,4 +19,6 @@ class ExplicitGuild(TypedDict):
current_player: int | None
is_stopped: bool # Prevents the `after` callback of play_track
allow_explicit: bool
allow_menu: bool # Is /toggle menu command available
allow_menu: bool # /toggle menu is only available if there's only one user in the voice chat.
shuffle: bool
repeat: bool

View File

@@ -1,8 +1,8 @@
import os
import logging
from dotenv import load_dotenv
import discord
from discord.errors import NotFound
from discord.ext.commands import Bot
try:
@@ -11,29 +11,33 @@ try:
except ImportError:
pass
intents = discord.Intents.all()
intents = discord.Intents.default()
intents.message_content = True
bot = Bot(intents=intents)
cogs_list = [
'general',
'voice'
]
for cog in cogs_list:
bot.load_extension(f'MusicBot.cogs.{cog}')
@bot.event
async def on_ready():
logging.info("Bot's ready!")
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
if not os.path.exists('music'):
os.mkdir('music')
token = os.getenv('TOKEN')
if not token:
raise ValueError('You must specify the bot TOKEN in your enviroment')
for cog in cogs_list:
bot.load_extension(f'MusicBot.cogs.{cog}')
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(name)s: %(message)s')
logging.getLogger('discord').setLevel(logging.INFO)
bot.run(token)