mirror of
https://github.com/deadcxap/YandexMusicDiscordBot.git
synced 2026-01-11 12:51:40 +03:00
impr: Async database and code optimization.
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
from .base import BaseGuildsDatabase, BaseUsersDatabase
|
||||
from .base import BaseGuildsDatabase, BaseUsersDatabase, guilds, users
|
||||
from .extensions import VoiceGuildsDatabase
|
||||
|
||||
from .user import User, ExplicitUser
|
||||
@@ -12,5 +12,7 @@ __all__ = [
|
||||
'ExplicitUser',
|
||||
'Guild',
|
||||
'ExplicitGuild',
|
||||
'MessageVotes'
|
||||
'MessageVotes',
|
||||
'guilds',
|
||||
'users',
|
||||
]
|
||||
@@ -1,186 +1,112 @@
|
||||
"""This documents initialises databse and contains methods to access it."""
|
||||
|
||||
from typing import Any, cast
|
||||
|
||||
from pymongo import MongoClient
|
||||
from pymongo.collection import Collection
|
||||
from typing import Iterable, Any, cast
|
||||
from pymongo import AsyncMongoClient, ReturnDocument
|
||||
from pymongo.asynchronous.collection import AsyncCollection
|
||||
from pymongo.results import UpdateResult
|
||||
|
||||
from .user import User, ExplicitUser
|
||||
from .guild import Guild, ExplicitGuild, MessageVotes
|
||||
|
||||
client: MongoClient = MongoClient("mongodb://localhost:27017/")
|
||||
users: Collection[ExplicitUser] = client.YandexMusicBot.users
|
||||
guilds: Collection[ExplicitGuild] = client.YandexMusicBot.guilds
|
||||
client: AsyncMongoClient = AsyncMongoClient("mongodb://localhost:27017/")
|
||||
|
||||
db = client.YandexMusicBot
|
||||
users: AsyncCollection[ExplicitUser] = db.users
|
||||
guilds: AsyncCollection[ExplicitGuild] = db.guilds
|
||||
|
||||
class BaseUsersDatabase:
|
||||
DEFAULT_USER = ExplicitUser(
|
||||
_id=0,
|
||||
ym_token=None,
|
||||
playlists=[],
|
||||
playlists_page=0,
|
||||
queue_page=0,
|
||||
vibe_batch_id=None,
|
||||
vibe_type=None,
|
||||
vibe_id=None,
|
||||
vibe_settings={
|
||||
'mood': 'all',
|
||||
'diversity': 'default',
|
||||
'lang': 'any'
|
||||
}
|
||||
)
|
||||
|
||||
def create_record(self, uid: int) -> None:
|
||||
"""Create user database record.
|
||||
|
||||
Args:
|
||||
uid (int): User id.
|
||||
"""
|
||||
uid = uid
|
||||
users.insert_one(ExplicitUser(
|
||||
_id=uid,
|
||||
ym_token=None,
|
||||
playlists=[],
|
||||
playlists_page=0,
|
||||
queue_page=0,
|
||||
vibe_batch_id=None,
|
||||
vibe_type=None,
|
||||
vibe_id=None,
|
||||
vibe_settings={
|
||||
'mood': 'all',
|
||||
'diversity': 'default',
|
||||
'lang': 'any'
|
||||
}
|
||||
))
|
||||
|
||||
def update(self, uid: int, data: User | dict[Any, Any]) -> None:
|
||||
"""Update user record.
|
||||
|
||||
Args:
|
||||
uid (int): User id.
|
||||
data (User | dict[Any, Any]): Updated data.
|
||||
"""
|
||||
self.get_user(uid)
|
||||
users.update_one({'_id': uid}, {"$set": data})
|
||||
|
||||
def get_user(self, uid: int) -> ExplicitUser:
|
||||
"""Get user record from database. Create new entry if not present.
|
||||
|
||||
Args:
|
||||
uid (int): User id.
|
||||
|
||||
Returns:
|
||||
User: User record.
|
||||
"""
|
||||
user = users.find_one({'_id': uid})
|
||||
if not user:
|
||||
self.create_record(uid)
|
||||
user = users.find_one({'_id': uid})
|
||||
user = cast(ExplicitUser, user)
|
||||
existing_fields = user.keys()
|
||||
fields: ExplicitUser = ExplicitUser(
|
||||
_id=0,
|
||||
ym_token=None,
|
||||
playlists=[],
|
||||
playlists_page=0,
|
||||
queue_page=0,
|
||||
vibe_batch_id=None,
|
||||
vibe_type=None,
|
||||
vibe_id=None,
|
||||
vibe_settings={
|
||||
'mood': 'all',
|
||||
'diversity': 'default',
|
||||
'lang': 'any'
|
||||
}
|
||||
async def update(self, uid: int, data: User | dict[str, Any]) -> UpdateResult:
|
||||
return await users.update_one(
|
||||
{'_id': uid},
|
||||
{'$set': data},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
async def get_user(self, uid: int, projection: User | Iterable[str] | None = None) -> ExplicitUser:
|
||||
user = await users.find_one_and_update(
|
||||
{'_id': uid},
|
||||
{'$setOnInsert': self.DEFAULT_USER},
|
||||
return_document=ReturnDocument.AFTER,
|
||||
upsert=True,
|
||||
projection=projection
|
||||
)
|
||||
return cast(ExplicitUser, user)
|
||||
|
||||
async def get_ym_token(self, uid: int) -> str | None:
|
||||
user = await users.find_one(
|
||||
{'_id': uid},
|
||||
projection={'ym_token': 1}
|
||||
)
|
||||
return cast(str | None, user.get('ym_token') if user else None)
|
||||
|
||||
async def add_playlist(self, uid: int, playlist_data: dict) -> UpdateResult:
|
||||
return await users.update_one(
|
||||
{'_id': uid},
|
||||
{'$push': {'playlists': playlist_data}}
|
||||
)
|
||||
for field, default_value in fields.items():
|
||||
if field not in existing_fields:
|
||||
user[field] = default_value
|
||||
users.update_one({'_id': uid}, {"$set": {field: default_value}})
|
||||
|
||||
return user
|
||||
|
||||
def get_ym_token(self, uid: int) -> str | None:
|
||||
user = users.find_one({'_id': uid})
|
||||
if not user:
|
||||
self.create_record(uid)
|
||||
user = users.find_one({'_id': uid})
|
||||
return cast(ExplicitUser, user)['ym_token']
|
||||
|
||||
class BaseGuildsDatabase:
|
||||
|
||||
def create_record(self, gid: int) -> None:
|
||||
"""Create guild database record.
|
||||
DEFAULT_GUILD = ExplicitGuild(
|
||||
_id=0,
|
||||
next_tracks=[],
|
||||
previous_tracks=[],
|
||||
current_track=None,
|
||||
current_menu=None,
|
||||
is_stopped=True,
|
||||
allow_explicit=True,
|
||||
always_allow_menu=False,
|
||||
vote_next_track=True,
|
||||
vote_add_track=True,
|
||||
vote_add_album=True,
|
||||
vote_add_artist=True,
|
||||
vote_add_playlist=True,
|
||||
shuffle=False,
|
||||
repeat=False,
|
||||
votes={},
|
||||
vibing=False,
|
||||
current_viber_id=None
|
||||
)
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
"""
|
||||
guilds.insert_one(ExplicitGuild(
|
||||
_id=gid,
|
||||
next_tracks=[],
|
||||
previous_tracks=[],
|
||||
current_track=None,
|
||||
current_menu=None,
|
||||
is_stopped=True,
|
||||
allow_explicit=True,
|
||||
always_allow_menu=False,
|
||||
vote_next_track=True,
|
||||
vote_add_track=True,
|
||||
vote_add_album=True,
|
||||
vote_add_artist=True,
|
||||
vote_add_playlist=True,
|
||||
shuffle=False,
|
||||
repeat=False,
|
||||
votes={},
|
||||
vibing=False,
|
||||
current_viber_id=None
|
||||
))
|
||||
|
||||
def update(self, gid: int, data: Guild) -> None:
|
||||
"""Update guild record.
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
data (dict[Any, Any]): Updated data.
|
||||
"""
|
||||
self.get_guild(gid)
|
||||
guilds.update_one({'_id': gid}, {"$set": data})
|
||||
|
||||
def get_guild(self, gid: int) -> ExplicitGuild:
|
||||
"""Get guild record from database. Create new entry if not present.
|
||||
|
||||
Args:
|
||||
uid (int): User id.
|
||||
|
||||
Returns:
|
||||
Guild: Guild record.
|
||||
"""
|
||||
guild = guilds.find_one({'_id': gid})
|
||||
if not guild:
|
||||
self.create_record(gid)
|
||||
guild = guilds.find_one({'_id': gid})
|
||||
|
||||
guild = cast(ExplicitGuild, guild)
|
||||
existing_fields = guild.keys()
|
||||
fields = ExplicitGuild(
|
||||
_id=0,
|
||||
next_tracks=[],
|
||||
previous_tracks=[],
|
||||
current_track=None,
|
||||
current_menu=None,
|
||||
is_stopped=True,
|
||||
allow_explicit=True,
|
||||
always_allow_menu=False,
|
||||
vote_next_track=True,
|
||||
vote_add_track=True,
|
||||
vote_add_album=True,
|
||||
vote_add_artist=True,
|
||||
vote_add_playlist=True,
|
||||
shuffle=False,
|
||||
repeat=False,
|
||||
votes={},
|
||||
vibing=False,
|
||||
current_viber_id=None
|
||||
async def update(self, gid: int, data: Guild | dict[str, Any]) -> UpdateResult:
|
||||
return await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': data},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
async def get_guild(self, gid: int, projection: Guild | Iterable[str] | None = None) -> ExplicitGuild:
|
||||
guild = await guilds.find_one_and_update(
|
||||
{'_id': gid},
|
||||
{'$setOnInsert': self.DEFAULT_GUILD},
|
||||
return_document=ReturnDocument.AFTER,
|
||||
upsert=True,
|
||||
projection=projection
|
||||
)
|
||||
return cast(ExplicitGuild, guild)
|
||||
|
||||
async def update_vote(self, gid: int, mid: int, data: MessageVotes) -> UpdateResult:
|
||||
return await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {f'votes.{mid}': data}}
|
||||
)
|
||||
|
||||
async def clear_queue(self, gid: int) -> UpdateResult:
|
||||
return await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {'next_tracks': []}}
|
||||
)
|
||||
for field, default_value in fields.items():
|
||||
if field not in existing_fields:
|
||||
guild[field] = default_value
|
||||
guilds.update_one({'_id': gid}, {"$set": {field: default_value}})
|
||||
|
||||
return guild
|
||||
|
||||
def update_vote(self, gid: int, mid: int, data: MessageVotes) -> None:
|
||||
"""Update vote for a message in a guild.
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
mid (int): Message id.
|
||||
vote (bool): Vote value.
|
||||
"""
|
||||
guild = self.get_guild(gid)
|
||||
guild['votes'][str(mid)] = data
|
||||
guilds.update_one({'_id': gid}, {"$set": {'votes': guild['votes']}})
|
||||
@@ -1,145 +1,222 @@
|
||||
from random import randint
|
||||
from typing import Any, Literal
|
||||
from yandex_music import Track
|
||||
from pymongo import UpdateOne, ReturnDocument
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
|
||||
from MusicBot.database import BaseGuildsDatabase
|
||||
from MusicBot.database import BaseGuildsDatabase, guilds
|
||||
|
||||
class VoiceGuildsDatabase(BaseGuildsDatabase):
|
||||
|
||||
def get_tracks_list(self, gid: int, type: Literal['next', 'previous']) -> list[dict[str, Any]]:
|
||||
"""Get tracks list with given type.
|
||||
async def get_tracks_list(self, gid: int, list_type: Literal['next', 'previous']) -> list[dict[str, Any]]:
|
||||
if list_type not in ('next', 'previous'):
|
||||
raise ValueError("list_type must be either 'next' or 'previous'")
|
||||
projection = {f"{list_type}_tracks": 1}
|
||||
guild = await self.get_guild(gid, projection=projection)
|
||||
return guild.get(f"{list_type}_tracks", [])
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
type (Literal['current', 'next', 'previous']): Track type.
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | None: Dictionary covertable to yandex_musci.Track or None
|
||||
"""
|
||||
guild = self.get_guild(gid)
|
||||
if type == 'next':
|
||||
tracks = guild['next_tracks']
|
||||
elif type == 'previous':
|
||||
tracks = guild['previous_tracks']
|
||||
async def get_track(self, gid: int, list_type: Literal['next', 'previous', 'current']) -> dict[str, Any] | None:
|
||||
if list_type not in ('next', 'previous', 'current'):
|
||||
raise ValueError("list_type must be either 'next' or 'previous'")
|
||||
|
||||
return tracks
|
||||
|
||||
def get_track(self, gid: int, type: Literal['current', 'next', 'previous']) -> dict[str, Any] | None:
|
||||
"""Get track with given type. Pop the track from list if `type` is 'next' or 'previous'.
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
type (Literal['current', 'next', 'previous']): Track type.
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | None: Dictionary covertable to yandex_musci.Track or None
|
||||
"""
|
||||
guild = self.get_guild(gid)
|
||||
if type == 'current':
|
||||
track = guild['current_track']
|
||||
elif type == 'next':
|
||||
tracks = guild['next_tracks']
|
||||
if not tracks:
|
||||
return None
|
||||
track = tracks.pop(0)
|
||||
self.update(gid, {'next_tracks': tracks})
|
||||
elif type == 'previous':
|
||||
tracks = guild['previous_tracks']
|
||||
if not tracks:
|
||||
return None
|
||||
track = tracks.pop(0)
|
||||
current_track = guild['current_track']
|
||||
if current_track:
|
||||
self.modify_track(gid, current_track, 'next', 'insert')
|
||||
self.update(gid, {'previous_tracks': tracks})
|
||||
if list_type == 'current':
|
||||
return (await self.get_guild(gid, projection={'current_track': 1}))['current_track']
|
||||
|
||||
field = f'{list_type}_tracks'
|
||||
update = {'$pop': {field: -1}}
|
||||
result = await guilds.find_one_and_update(
|
||||
{'_id': gid},
|
||||
update,
|
||||
projection={field: 1},
|
||||
return_document=ReturnDocument.BEFORE
|
||||
)
|
||||
|
||||
res = result.get(field, [])[0] if result and result.get(field) else None
|
||||
|
||||
if field == 'previous_tracks' and res:
|
||||
await guilds.find_one_and_update(
|
||||
{'_id': gid},
|
||||
{'$push': {'next_tracks': {'$each': [res], '$position': 0}}},
|
||||
projection={'next_tracks': 1}
|
||||
)
|
||||
|
||||
return res
|
||||
|
||||
async def modify_track(
|
||||
self,
|
||||
gid: int,
|
||||
track: Track | dict[str, Any] | list[dict[str, Any]] | list[Track],
|
||||
list_type: Literal['next', 'previous'],
|
||||
operation: Literal['insert', 'append', 'extend', 'pop_start', 'pop_end']
|
||||
) -> dict[str, Any] | None:
|
||||
field = f"{list_type}_tracks"
|
||||
track_data = self._normalize_track_data(track)
|
||||
|
||||
operations = {
|
||||
'insert': {'$push': {field: {'$each': track_data, '$position': 0}}},
|
||||
'append': {'$push': {field: {'$each': track_data}}},
|
||||
'extend': {'$push': {field: {'$each': track_data}}},
|
||||
'pop_start': {'$pop': {field: -1}},
|
||||
'pop_end': {'$pop': {field: 1}}
|
||||
}
|
||||
|
||||
update = operations[operation]
|
||||
try:
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
update,
|
||||
array_filters=None
|
||||
)
|
||||
return await self._get_popped_track(gid, field, operation)
|
||||
except DuplicateKeyError:
|
||||
await self._handle_duplicate_error(gid, field)
|
||||
return await self.modify_track(gid, track, list_type, operation)
|
||||
|
||||
def _normalize_track_data(self, track: Track | dict | list) -> list[dict]:
|
||||
if not isinstance(track, list):
|
||||
track = [track]
|
||||
|
||||
return [
|
||||
t.to_dict() if isinstance(t, Track) else t
|
||||
for t in track
|
||||
]
|
||||
|
||||
async def pop_random_track(self, gid: int, field: Literal['next', 'previous']) -> dict[str, Any] | None:
|
||||
tracks = await self.get_tracks_list(gid, field)
|
||||
track = tracks.pop(randint(0, len(tracks) - 1)) if tracks else None
|
||||
await self.update(gid, {f"{field}_tracks": tracks})
|
||||
return track
|
||||
|
||||
def modify_track(
|
||||
self, gid: int,
|
||||
track: Track | dict[str, Any] | list[dict[str, Any]] | list[Track],
|
||||
type: Literal['next', 'previous'],
|
||||
operation: Literal['insert', 'append', 'extend', 'pop_start', 'pop_end', 'pop_random']
|
||||
) -> dict[str, Any] | None:
|
||||
"""Perform operation of given type on tracks list of given type.
|
||||
async def get_current_menu(self, gid: int) -> int | None:
|
||||
guild = await self.get_guild(gid, projection={'current_menu': 1})
|
||||
return guild['current_menu']
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
track (Track | dict[str, Any]): yandex_music.Track or a dictionary convertable to it.
|
||||
type (Literal['current', 'next', 'previous']): List type.
|
||||
operation (Literal['insert', 'append', 'pop_start', 'pop_end']): Operation type.
|
||||
async def _get_popped_track(self, gid: int, field: str, operation: str) -> dict[str, Any] | None:
|
||||
if operation not in ('pop_start', 'pop_end', 'pop_random'):
|
||||
return None
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | None: Dictionary convertable to yandex_music.Track or None.
|
||||
"""
|
||||
guild = self.get_guild(gid)
|
||||
|
||||
if type not in ('next', 'previous'):
|
||||
raise ValueError(f"Type must be either 'next' or 'previous', not '{type}'")
|
||||
explicit_type: Literal['next_tracks', 'previous_tracks'] = type + '_tracks' # type: ignore[assignment]
|
||||
tracks = guild[explicit_type]
|
||||
pop_track = None
|
||||
|
||||
if isinstance(track, list):
|
||||
tracks_list = []
|
||||
for _track in track:
|
||||
if isinstance(_track, Track):
|
||||
tracks_list.append(_track.to_dict())
|
||||
else:
|
||||
tracks_list.append(_track)
|
||||
|
||||
if operation != 'extend':
|
||||
raise ValueError('Can only use extend operation on lists.')
|
||||
else:
|
||||
tracks.extend(tracks_list)
|
||||
self.update(gid, {explicit_type: tracks}) # type: ignore
|
||||
else:
|
||||
if isinstance(track, Track):
|
||||
track = track.to_dict()
|
||||
if operation == 'insert':
|
||||
if type == 'previous' and len(tracks) > 50:
|
||||
tracks.pop()
|
||||
tracks.insert(0, track)
|
||||
elif operation == 'append':
|
||||
tracks.append(track)
|
||||
elif operation == 'pop_start':
|
||||
pop_track = tracks.pop(0)
|
||||
elif operation == 'pop_end':
|
||||
pop_track = tracks.pop(-1)
|
||||
elif operation == 'pop_random':
|
||||
pop_track = tracks.pop(randint(0, len(tracks)))
|
||||
elif operation == 'extend':
|
||||
raise ValueError('Can only use extend operation on lists.')
|
||||
else:
|
||||
raise ValueError(f"Unknown operation '{operation}'")
|
||||
guild = await self.get_guild(gid, projection={field: 1})
|
||||
tracks = guild.get(field, [])
|
||||
|
||||
self.update(gid, {explicit_type: tracks}) # type: ignore
|
||||
|
||||
return pop_track
|
||||
|
||||
def get_random_track(self, gid: int) -> dict[str, Any] | None:
|
||||
"""Pop random track from the queue.
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | None: Dictionary covertable to yandex_musci.Track or None
|
||||
"""
|
||||
tracks = self.get_tracks_list(gid, 'next')
|
||||
if not tracks:
|
||||
return None
|
||||
track = tracks.pop(randint(0, len(tracks)))
|
||||
self.update(gid, {'next_tracks': tracks})
|
||||
return track
|
||||
|
||||
def get_current_menu(self, gid: int) -> int | None:
|
||||
"""Get current menu.
|
||||
|
||||
if operation == 'pop_start':
|
||||
return tracks[0]
|
||||
elif operation == 'pop_end':
|
||||
return tracks[-1]
|
||||
elif operation == 'pop_random':
|
||||
return tracks[randint(0, len(tracks) - 1)]
|
||||
|
||||
return None
|
||||
|
||||
async def _handle_duplicate_error(self, gid: int, field: str) -> None:
|
||||
"""Handle duplicate key errors by cleaning up the array."""
|
||||
guild = await self.get_guild(gid, projection={field: 1})
|
||||
tracks = guild.get(field, [])
|
||||
|
||||
Args:
|
||||
gid (int): Guild id.
|
||||
if not tracks:
|
||||
return
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
unique_tracks = []
|
||||
seen = set()
|
||||
for track in tracks:
|
||||
track_id = track.get('id')
|
||||
if track_id not in seen:
|
||||
seen.add(track_id)
|
||||
unique_tracks.append(track)
|
||||
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {field: unique_tracks}}
|
||||
)
|
||||
|
||||
async def set_current_track(self, gid: int, track: Track | dict[str, Any]) -> None:
|
||||
"""Set the current track and update the previous tracks list."""
|
||||
if isinstance(track, Track):
|
||||
track = track.to_dict()
|
||||
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{
|
||||
'$set': {'current_track': track}
|
||||
}
|
||||
)
|
||||
|
||||
async def clear_tracks(self, gid: int, list_type: Literal['next', 'previous']) -> None:
|
||||
"""Clear the specified tracks list."""
|
||||
field = f"{list_type}_tracks"
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {field: []}}
|
||||
)
|
||||
|
||||
async def shuffle_tracks(self, gid: int, list_type: Literal['next', 'previous']) -> None:
|
||||
"""Shuffle the specified tracks list."""
|
||||
field = f"{list_type}_tracks"
|
||||
guild = await self.get_guild(gid, projection={field: 1})
|
||||
tracks = guild.get(field, [])
|
||||
|
||||
if not tracks:
|
||||
return
|
||||
|
||||
shuffled_tracks = tracks.copy()
|
||||
for i in range(len(shuffled_tracks) - 1, 0, -1):
|
||||
j = randint(0, i)
|
||||
shuffled_tracks[i], shuffled_tracks[j] = shuffled_tracks[j], shuffled_tracks[i]
|
||||
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {field: shuffled_tracks}}
|
||||
)
|
||||
|
||||
async def move_track(
|
||||
self,
|
||||
gid: int,
|
||||
from_list: Literal['next', 'previous'],
|
||||
to_list: Literal['next', 'previous'],
|
||||
track_index: int
|
||||
) -> bool:
|
||||
"""Move a track from one list to another."""
|
||||
from_field = f"{from_list}_tracks"
|
||||
to_field = f"{to_list}_tracks"
|
||||
|
||||
Returns: int | None: Menu message id or None if not present.
|
||||
"""
|
||||
guild = self.get_guild(gid)
|
||||
return guild['current_menu']
|
||||
if from_field not in ('next_tracks', 'previous_tracks') or to_field not in ('next_tracks', 'previous_tracks'):
|
||||
raise ValueError(f"Invalid list type: '{from_field}'")
|
||||
|
||||
guild = await guilds.find_one(
|
||||
{'_id': gid},
|
||||
projection={from_field: 1, to_field: 1},
|
||||
)
|
||||
|
||||
if not guild or not guild.get(from_field) or track_index >= len(guild[from_field]):
|
||||
return False
|
||||
|
||||
track = guild[from_field].pop(track_index)
|
||||
updates = [
|
||||
UpdateOne(
|
||||
{'_id': gid},
|
||||
{'$set': {from_field: guild[from_field]}},
|
||||
),
|
||||
UpdateOne(
|
||||
{'_id': gid},
|
||||
{'$push': {to_field: {'$each': [track], '$position': 0}}},
|
||||
)
|
||||
]
|
||||
|
||||
await guilds.bulk_write(updates)
|
||||
return True
|
||||
|
||||
async def get_track_count(self, gid: int, list_type: Literal['next', 'previous']) -> int:
|
||||
"""Get the count of tracks in the specified list."""
|
||||
field = f"{list_type}_tracks"
|
||||
guild = await self.get_guild(gid, projection={field: 1})
|
||||
return len(guild.get(field, []))
|
||||
|
||||
async def set_current_menu(self, gid: int, menu_id: int | None) -> None:
|
||||
"""Set the current menu message ID."""
|
||||
await guilds.update_one(
|
||||
{'_id': gid},
|
||||
{'$set': {'current_menu': menu_id}}
|
||||
)
|
||||
Reference in New Issue
Block a user