feat: support playlistSongIndex

pull/8/head
WorldObservationLog 5 months ago
parent 9e58e1dc66
commit 43fb4679a7
  1. 6
      src/metadata.py
  2. 1
      src/models/playlist_info.py
  3. 14
      src/rip.py
  4. 15
      src/utils.py

@ -25,6 +25,7 @@ class SongMetadata(BaseModel):
record_company: Optional[str] = None record_company: Optional[str] = None
upc: Optional[str] = None upc: Optional[str] = None
isrc: Optional[str] = None isrc: Optional[str] = None
playlistIndex: Optional[int] = None
def to_itags_params(self, embed_metadata: list[str]): def to_itags_params(self, embed_metadata: list[str]):
tags = [] tags = []
@ -32,6 +33,8 @@ class SongMetadata(BaseModel):
if not value: if not value:
continue continue
if key in embed_metadata and value: if key in embed_metadata and value:
if "playlist" in key:
continue
if key == "cover": if key == "cover":
continue continue
if key == "lyrics": if key == "lyrics":
@ -60,3 +63,6 @@ class SongMetadata(BaseModel):
async def get_cover(self, cover_format: str, cover_size: str): async def get_cover(self, cover_format: str, cover_size: str):
self.cover = await get_cover(self.cover_url, cover_format, cover_size) self.cover = await get_cover(self.cover_url, cover_format, cover_size)
def set_playlist_index(self, index: int):
self.playlistIndex = index

@ -132,3 +132,4 @@ class Datum(BaseModel):
class PlaylistInfo(BaseModel): class PlaylistInfo(BaseModel):
data: List[Datum] data: List[Datum]
songIdIndexMapping: dict[str, int] = {}

@ -14,7 +14,7 @@ from src.mp4 import extract_media, extract_song, encapsulate, write_metadata
from src.save import save from src.save import save
from src.types import GlobalAuthParams, Codec from src.types import GlobalAuthParams, Codec
from src.url import Song, Album, URLType, Artist, Playlist from src.url import Song, Album, URLType, Artist, Playlist
from src.utils import check_song_exists, if_raw_atmos from src.utils import check_song_exists, if_raw_atmos, playlist_write_song_index
@logger.catch @logger.catch
@ -24,6 +24,8 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
token = auth_params.anonymousAccessToken token = auth_params.anonymousAccessToken
song_data = await get_song_info(song.id, token, song.storefront, config.region.language) song_data = await get_song_info(song.id, token, song.storefront, config.region.language)
song_metadata = SongMetadata.parse_from_song_data(song_data) song_metadata = SongMetadata.parse_from_song_data(song_data)
if playlist:
song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id))
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not force_save and check_song_exists(song_metadata, config.download, codec, playlist): if not force_save and check_song_exists(song_metadata, config.download, codec, playlist):
logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists") logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists")
@ -74,13 +76,17 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con
async def rip_playlist(playlist: Playlist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, async def rip_playlist(playlist: Playlist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False): force_save: bool = False):
playlist_info = await get_playlist_info_and_tracks(playlist.id, auth_params.anonymousAccessToken, playlist.storefront, playlist_info = await get_playlist_info_and_tracks(playlist.id, auth_params.anonymousAccessToken,
playlist.storefront,
config.region.language) config.region.language)
logger.info(f"Ripping Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name}") playlist_info = playlist_write_song_index(playlist_info)
logger.info(
f"Ripping Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
for track in playlist_info.data[0].relationships.tracks.data: for track in playlist_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=playlist.storefront, url="", type=URLType.Song) song = Song(id=track.id, storefront=playlist.storefront, url="", type=URLType.Song)
tg.create_task(rip_song(song, auth_params, codec, config, device, force_save=force_save, playlist=playlist_info)) tg.create_task(
rip_song(song, auth_params, codec, config, device, force_save=force_save, playlist=playlist_info))
logger.info( logger.info(
f"Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name} finished ripping") f"Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name} finished ripping")

@ -147,11 +147,12 @@ def playlist_metadata_to_params(playlist: PlaylistInfo):
"playlistCuratorName": playlist.data[0].attributes.curatorName} "playlistCuratorName": playlist.data[0].attributes.curatorName}
def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist: PlaylistMeta = None): def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist: PlaylistInfo = None):
if playlist: if playlist:
song_name = config.playlistSongNameFormat.format(codec=codec, **metadata.model_dump(), song_name = config.playlistSongNameFormat.format(codec=codec, playlistSongIndex=metadata.playlistIndex,
**playlist_metadata_to_params(playlist)) **metadata.model_dump())
dir_path = Path(config.playlistDirPathFormat.format(codec=codec, **metadata.model_dump(), dir_path = Path(config.playlistDirPathFormat.format(codec=codec,
**metadata.model_dump(),
**playlist_metadata_to_params(playlist))) **playlist_metadata_to_params(playlist)))
else: else:
song_name = config.songNameFormat.format(codec=codec, **metadata.model_dump()) song_name = config.songNameFormat.format(codec=codec, **metadata.model_dump())
@ -160,3 +161,9 @@ def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist:
song_name = get_valid_filename(song_name) song_name = get_valid_filename(song_name)
dir_path = Path(*[get_valid_filename(part) if ":\\" not in part else part for part in dir_path.parts]) dir_path = Path(*[get_valid_filename(part) if ":\\" not in part else part for part in dir_path.parts])
return song_name, dir_path return song_name, dir_path
def playlist_write_song_index(playlist: PlaylistInfo):
for track_index, track in enumerate(playlist.data[0].relationships.tracks.data):
playlist.songIdIndexMapping[track.id] = track_index + 1
return playlist

Loading…
Cancel
Save