feat: playlist download

pull/8/head
WorldObservationLog 5 months ago
parent 1ce370a773
commit af2a3193bc
  1. 8
      config.example.toml
  2. 67
      src/api.py
  3. 14
      src/cmd.py
  4. 2
      src/config.py
  5. 2
      src/metadata.py
  6. 2
      src/models/__init__.py
  7. 12
      src/models/artist_albums.py
  8. 2
      src/models/artist_info.py
  9. 14
      src/models/artist_songs.py
  10. 134
      src/models/playlist_info.py
  11. 73
      src/models/plsylist_tracks.py
  12. 13
      src/mp4.py
  13. 49
      src/rip.py
  14. 18
      src/save.py
  15. 49
      src/utils.py

@ -43,6 +43,14 @@ atmosConventToM4a = true
songNameFormat = "{disk}-{tracknum:02d} {title}"
# Ditto
dirPathFormat = "downloads/{album_artist}/{album}"
# Available values:
# title, artist, album, album_artist, composer,
# genre, created, track, tracknum, disk,
# record_company, upc, isrc, copyright,
# playlistName, playlistCuratorName
playlistDirPathFormat = "downloads/playlists/{playlistName}"
# Ditto
playlistSongNameFormat = "{artist} - {title}"
# Save lyrics as .lrc file
saveLyrics = true
saveCover = true

@ -69,37 +69,46 @@ async def download_song(url: str) -> bytes:
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_meta(album_id: str, token: str, storefront: str, lang: str):
if "pl." in album_id:
mtype = "playlists"
else:
mtype = "albums"
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/{mtype}/{album_id}",
async def get_album_info(album_id: str, token: str, storefront: str, lang: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/albums/{album_id}",
params={"omit[resource]": "autos", "include": "tracks,artists,record-labels",
"include[songs]": "artists", "fields[artists]": "name",
"fields[albums:albums]": "artistName,artwork,name,releaseDate,url",
"fields[record-labels]": "name", "l": lang},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
if mtype == "albums":
return AlbumMeta.model_validate(req.json())
else:
result = PlaylistMeta.model_validate(req.json())
result.data[0].attributes.artistName = "Apple Music"
if result.data[0].relationships.tracks.next:
page = 0
while True:
page += 100
page_req = await client.get(
f"https://amp-api.music.apple.com/v1/catalog/{storefront}/{mtype}/{album_id}/tracks",
params={"offset": page, "l": lang},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
page_result = TracksMeta.model_validate(page_req.json())
result.data[0].relationships.tracks.data.extend(page_result.data)
if not page_result.next:
break
return result
return AlbumMeta.model_validate(req.json())
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_playlist_info_and_tracks(playlist_id: str, token: str, storefront: str, lang: str):
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}",
params={"l": lang},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
playlist_info_obj = PlaylistInfo.parse_obj(resp.json())
if playlist_info_obj.data[0].relationships.tracks.next:
all_tracks = await get_playlist_tracks(playlist_id, token, storefront, lang)
playlist_info_obj.data[0].relationships.tracks = all_tracks
return playlist_info_obj
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_playlist_tracks(playlist_id: str, token: str, storefront: str, lang: str, offset: int = 0):
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}/tracks",
params={"l": lang, "offset": offset},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
playlist_tracks = PlaylistTracks.parse_obj(resp.json())
tracks = playlist_tracks.data
if playlist_tracks.next:
next_tracks = await get_playlist_info_and_tracks(playlist_id, token, storefront, lang, offset + 100)
tracks.extend(next_tracks)
return tracks
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
@ -115,14 +124,14 @@ async def get_cover(url: str, cover_format: str, cover_size: str):
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_info_from_adam(adam_id: str, token: str, storefront: str, lang: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{adam_id}",
async def get_song_info(song_id: str, token: str, storefront: str, lang: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}",
params={"extend": "extendedAssetUrls", "include": "albums", "l": lang},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_itunes,
"Origin": "https://music.apple.com"})
song_data_obj = SongData.model_validate(req.json())
for data in song_data_obj.data:
if data.id == adam_id:
if data.id == song_id:
return data
return None
@ -180,4 +189,4 @@ async def get_artist_info(artist_id: str, storefront: str, token: str, lang: str
params={"l": lang},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
return ArtistInfo.parse_obj(resp.json())
return ArtistInfo.parse_obj(resp.json())

@ -9,9 +9,9 @@ from prompt_toolkit import PromptSession, print_formatted_text, ANSI
from prompt_toolkit.patch_stdout import patch_stdout
from src.adb import Device
from src.api import get_token, init_client_and_lock, upload_m3u8_to_api, get_info_from_adam
from src.api import get_token, init_client_and_lock, upload_m3u8_to_api, get_song_info
from src.config import Config
from src.rip import rip_song, rip_album, rip_artist
from src.rip import rip_song, rip_album, rip_artist, rip_playlist
from src.types import GlobalAuthParams
from src.url import AppleMusicURL, URLType, Song
from src.utils import get_song_id_from_m3u8
@ -100,10 +100,14 @@ class NewInteractiveShell:
task = self.loop.create_task(
rip_song(url, global_auth_param, codec, self.config, available_device, force_download))
case URLType.Album:
task = self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device))
task = self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device,
force_download))
case URLType.Artist:
task = self.loop.create_task(rip_artist(url, global_auth_param, codec, self.config, available_device,
force_download, include))
case URLType.Playlist:
task = self.loop.create_task(rip_playlist(url, global_auth_param, codec, self.config, available_device,
force_download))
case _:
logger.error("Unsupported URLType")
return
@ -129,8 +133,8 @@ class NewInteractiveShell:
tasks = set()
async def upload(song_id: str, m3u8_url: str):
song_info = await get_info_from_adam(song_id, self.anonymous_access_token,
self.config.region.defaultStorefront, self.config.region.language)
song_info = await get_song_info(song_id, self.anonymous_access_token,
self.config.region.defaultStorefront, self.config.region.language)
await upload_m3u8_to_api(self.config.m3u8Api.endpoint, m3u8_url, song_info)
def callback(m3u8_url):

@ -28,6 +28,8 @@ class Download(BaseModel):
atmosConventToM4a: bool
songNameFormat: str
dirPathFormat: str
playlistDirPathFormat: str
playlistSongNameFormat: str
saveLyrics: bool
saveCover: bool
coverFormat: str

@ -26,7 +26,7 @@ class SongMetadata(BaseModel):
upc: Optional[str] = None
isrc: Optional[str] = None
def to_itags_params(self, embed_metadata: list[str], cover_format: str):
def to_itags_params(self, embed_metadata: list[str]):
tags = []
for key, value in self.model_dump().items():
if not value:

@ -6,3 +6,5 @@ from src.models.tracks_meta import TracksMeta
from src.models.artist_albums import ArtistAlbums
from src.models.artist_songs import ArtistSongs
from src.models.artist_info import ArtistInfo
from src.models.playlist_info import PlaylistInfo
from src.models.plsylist_tracks import PlaylistTracks

@ -14,7 +14,7 @@ class Artwork(BaseModel):
textColor4: Optional[str] = None
textColor1: Optional[str] = None
bgColor: Optional[str] = None
hasP3: bool
hasP3: Optional[bool] = None
class PlayParams(BaseModel):
@ -32,20 +32,20 @@ class Attributes(BaseModel):
copyright: Optional[str] = None
genreNames: List[str]
releaseDate: Optional[str] = None
isMasteredForItunes: bool
isMasteredForItunes: Optional[bool] = None
upc: Optional[str] = None
artwork: Artwork
url: Optional[str] = None
playParams: PlayParams
recordLabel: Optional[str] = None
trackCount: Optional[int] = None
isCompilation: bool
isPrerelease: bool
isCompilation: Optional[bool] = None
isPrerelease: Optional[bool] = None
audioTraits: List[str]
isSingle: bool
isSingle: Optional[bool] = None
name: Optional[str] = None
artistName: Optional[str] = None
isComplete: bool
isComplete: Optional[bool] = None
editorialNotes: Optional[EditorialNotes] = None

@ -14,7 +14,7 @@ class Artwork(BaseModel):
textColor4: Optional[str] = None
textColor1: Optional[str] = None
bgColor: Optional[str] = None
hasP3: bool
hasP3: Optional[bool] = None
class Attributes(BaseModel):

@ -14,7 +14,7 @@ class Artwork(BaseModel):
textColor4: Optional[str] = None
textColor1: Optional[str] = None
bgColor: Optional[str] = None
hasP3: bool
hasP3: Optional[bool] = None
class PlayParams(BaseModel):
@ -27,14 +27,14 @@ class Preview(BaseModel):
class Attributes(BaseModel):
hasTimeSyncedLyrics: bool
hasTimeSyncedLyrics: Optional[bool] = None
albumName: Optional[str] = None
genreNames: List[str]
trackNumber: Optional[int] = None
releaseDate: Optional[str] = None
durationInMillis: Optional[int] = None
isVocalAttenuationAllowed: bool
isMasteredForItunes: bool
isVocalAttenuationAllowed: Optional[bool] = None
isMasteredForItunes: Optional[bool] = None
isrc: Optional[str] = None
artwork: Artwork
audioLocale: Optional[str] = None
@ -42,9 +42,9 @@ class Attributes(BaseModel):
url: Optional[str] = None
playParams: PlayParams
discNumber: Optional[int] = None
hasCredits: bool
hasLyrics: bool
isAppleDigitalMaster: bool
hasCredits: Optional[bool] = None
hasLyrics: Optional[bool] = None
isAppleDigitalMaster: Optional[bool] = None
audioTraits: List[str]
name: Optional[str] = None
previews: List[Preview]

@ -0,0 +1,134 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel
class Description(BaseModel):
standard: Optional[str] = None
class Artwork(BaseModel):
width: Optional[int] = None
height: Optional[int] = None
url: Optional[str] = None
hasP3: Optional[bool] = None
class PlayParams(BaseModel):
id: Optional[str] = None
kind: Optional[str] = None
versionHash: Optional[str] = None
class Attributes(BaseModel):
hasCollaboration: Optional[bool] = None
curatorName: Optional[str] = None
lastModifiedDate: Optional[str] = None
audioTraits: List
name: Optional[str] = None
isChart: Optional[bool] = None
supportsSing: Optional[bool] = None
playlistType: Optional[str] = None
description: Optional[Description] = None
artwork: Optional[Artwork] = None
playParams: PlayParams
url: Optional[str] = None
class Datum1(BaseModel):
id: Optional[str] = None
type: Optional[str] = None
href: Optional[str] = None
class Curator(BaseModel):
href: Optional[str] = None
data: List[Datum1]
class Artwork1(BaseModel):
width: Optional[int] = None
url: Optional[str] = None
height: Optional[int] = None
textColor3: Optional[str] = None
textColor2: Optional[str] = None
textColor4: Optional[str] = None
textColor1: Optional[str] = None
bgColor: Optional[str] = None
hasP3: Optional[bool] = None
class PlayParams1(BaseModel):
id: Optional[str] = None
kind: Optional[str] = None
class Preview(BaseModel):
url: Optional[str] = None
class Attributes1(BaseModel):
albumName: Optional[str] = None
hasTimeSyncedLyrics: Optional[bool] = None
genreNames: List[str]
trackNumber: Optional[int] = None
releaseDate: Optional[str] = None
durationInMillis: Optional[int] = None
isVocalAttenuationAllowed: Optional[bool] = None
isMasteredForItunes: Optional[bool] = None
isrc: Optional[str] = None
artwork: Artwork1
composerName: Optional[str] = None
audioLocale: Optional[str] = None
url: Optional[str] = None
playParams: PlayParams1
discNumber: Optional[int] = None
hasCredits: Optional[bool] = None
isAppleDigitalMaster: Optional[bool] = None
hasLyrics: Optional[bool] = None
audioTraits: List[str]
name: Optional[str] = None
previews: List[Preview]
artistName: Optional[str] = None
class ContentVersion(BaseModel):
RTCI: Optional[int] = None
MZ_INDEXER: Optional[int] = None
class Meta(BaseModel):
contentVersion: ContentVersion
class Datum2(BaseModel):
id: Optional[str] = None
type: Optional[str] = None
href: Optional[str] = None
attributes: Attributes1
meta: Meta
class Tracks(BaseModel):
href: Optional[str] = None
next: Optional[str] = None
data: List[Datum2]
class Relationships(BaseModel):
curator: Curator
tracks: Tracks
class Datum(BaseModel):
id: Optional[str] = None
type: Optional[str] = None
href: Optional[str] = None
attributes: Attributes
relationships: Relationships
class PlaylistInfo(BaseModel):
data: List[Datum]

@ -0,0 +1,73 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel
class Artwork(BaseModel):
width: Optional[int] = None
url: Optional[str] = None
height: Optional[int] = None
textColor3: Optional[str] = None
textColor2: Optional[str] = None
textColor4: Optional[str] = None
textColor1: Optional[str] = None
bgColor: Optional[str] = None
hasP3: Optional[bool] = None
class PlayParams(BaseModel):
id: Optional[str] = None
kind: Optional[str] = None
class Preview(BaseModel):
url: Optional[str] = None
class Attributes(BaseModel):
albumName: Optional[str] = None
hasTimeSyncedLyrics: Optional[bool] = None
genreNames: List[str]
trackNumber: Optional[int] = None
releaseDate: Optional[str] = None
durationInMillis: Optional[int] = None
isVocalAttenuationAllowed: Optional[bool] = None
isMasteredForItunes: Optional[bool] = None
isrc: Optional[str] = None
artwork: Artwork
audioLocale: Optional[str] = None
composerName: Optional[str] = None
url: Optional[str] = None
playParams: PlayParams
discNumber: Optional[int] = None
hasCredits: Optional[bool] = None
isAppleDigitalMaster: Optional[bool] = None
hasLyrics: Optional[bool] = None
audioTraits: List[str]
name: Optional[str] = None
previews: List[Preview]
artistName: Optional[str] = None
class ContentVersion(BaseModel):
RTCI: Optional[int] = None
MZ_INDEXER: Optional[int] = None
class Meta(BaseModel):
contentVersion: ContentVersion
class Datum(BaseModel):
id: Optional[str] = None
type: Optional[str] = None
href: Optional[str] = None
attributes: Attributes
meta: Meta
class PlaylistTracks(BaseModel):
next: Optional[str] = None
data: List[Datum]

@ -14,7 +14,7 @@ from loguru import logger
from src.exceptions import CodecNotFoundException
from src.metadata import SongMetadata
from src.types import *
from src.utils import find_best_codec, get_codec_from_codec_id
from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
@ -25,7 +25,7 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
codec_priority: list[str], alternative_codec: bool = False ) -> Tuple[str, list[str]]:
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str]]:
parsed_m3u8 = m3u8.load(m3u8_url)
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
if not specifyPlaylist and alternative_codec:
@ -117,12 +117,7 @@ def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent: bool
media = Path(tmp_dir.name) / Path(name).with_suffix(".media")
with open(media.absolute(), "wb") as f:
f.write(decrypted_media)
if song_info.codec == Codec.EC3 and not atmos_convent:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".ec3")
elif song_info.codec == Codec.AC3 and not atmos_convent:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".ac3")
else:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".m4a")
song_name = Path(tmp_dir.name) / Path(name).with_suffix(get_suffix(song_info.codec, atmos_convent))
match song_info.codec:
case Codec.ALAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
@ -181,7 +176,7 @@ def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str
time = datetime.strptime(metadata.created, "%Y-%m-%d").strftime("%d/%m/%Y")
subprocess.run(["mp4box", "-time", time, "-mtime", time, "-keep-utc", "-name", f"1={metadata.title}", "-itags",
":".join(["tool=", f"cover={absolute_cover_path}",
metadata.to_itags_params(embed_metadata, cover_format)]),
metadata.to_itags_params(embed_metadata)]),
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
embed_song = f.read()

@ -3,27 +3,29 @@ import subprocess
from loguru import logger
from src.api import (get_info_from_adam, get_song_lyrics, get_meta, download_song,
get_m3u8_from_api, get_artist_info, get_songs_from_artist, get_albums_from_artist)
from src.api import (get_song_info, get_song_lyrics, get_album_info, download_song,
get_m3u8_from_api, get_artist_info, get_songs_from_artist, get_albums_from_artist,
get_playlist_info_and_tracks)
from src.config import Config, Device
from src.decrypt import decrypt
from src.metadata import SongMetadata
from src.models import PlaylistMeta
from src.mp4 import extract_media, extract_song, encapsulate, write_metadata
from src.save import save
from src.types import GlobalAuthParams, Codec
from src.url import Song, Album, URLType, Artist
from src.utils import check_song_exists
from src.url import Song, Album, URLType, Artist, Playlist
from src.utils import check_song_exists, if_raw_atmos
@logger.catch
async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False, specified_m3u8: str = ""):
force_save: bool = False, specified_m3u8: str = "", playlist: PlaylistMeta = None):
logger.debug(f"Task of song id {song.id} was created")
token = auth_params.anonymousAccessToken
song_data = await get_info_from_adam(song.id, token, song.storefront, config.region.language)
song_data = await get_song_info(song.id, token, song.storefront, config.region.language)
song_metadata = SongMetadata.parse_from_song_data(song_data)
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not force_save and check_song_exists(song_metadata, config.download, codec):
if not force_save and check_song_exists(song_metadata, config.download, codec, playlist):
logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists")
return
await song_metadata.get_cover(config.download.coverFormat, config.download.coverSize)
@ -47,11 +49,9 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
song_info = extract_song(raw_song, codec)
decrypted_song = await decrypt(song_info, keys, song_data, device)
song = encapsulate(song_info, decrypted_song, config.download.atmosConventToM4a)
if codec != Codec.EC3 or (codec == Codec.EC3 and config.download.atmosConventToM4a):
if not if_raw_atmos(codec, config.download.atmosConventToM4a):
song = write_metadata(song, song_metadata, config.metadata.embedMetadata, config.download.coverFormat)
elif codec != Codec.AC3 or (codec == Codec.AC3 and config.download.atmosConventToM4a):
song = write_metadata(song, song_metadata, config.metadata.embedMetadata, config.download.coverFormat)
filename = save(song, codec, song_metadata, config.download)
filename = save(song, codec, song_metadata, config.download, playlist)
logger.info(f"Song {song_metadata.artist} - {song_metadata.title} saved!")
if config.download.afterDownloaded:
command = config.download.afterDownloaded.format(filename=filename)
@ -61,7 +61,8 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False):
album_info = await get_meta(album.id, auth_params.anonymousAccessToken, album.storefront, config.region.language)
album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront,
config.region.language)
logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg:
for track in album_info.data[0].relationships.tracks.data:
@ -71,21 +72,33 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con
f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping")
async def rip_playlist():
pass
async def rip_playlist(playlist: Playlist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False):
playlist_info = await get_playlist_info_and_tracks(playlist.id, auth_params.anonymousAccessToken, playlist.storefront,
config.region.language)
logger.info(f"Ripping Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg:
for track in playlist_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=playlist.storefront, url="", type=URLType.Song)
tg.create_task(rip_song(song, auth_params, codec, config, device, force_save=force_save, playlist=playlist_info))
logger.info(
f"Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name} finished ripping")
async def rip_artist(artist: Artist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False, include_participate_in_works: bool = False):
artist_info = await get_artist_info(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language)
artist_info = await get_artist_info(artist.id, artist.storefront, auth_params.anonymousAccessToken,
config.region.language)
logger.info(f"Ripping Artist: {artist_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg:
if include_participate_in_works:
songs = await get_songs_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language)
songs = await get_songs_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken,
config.region.language)
for song_url in songs:
tg.create_task(rip_song(Song.parse_url(song_url), auth_params, codec, config, device, force_save))
else:
albums = await get_albums_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language)
albums = await get_albums_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken,
config.region.language)
for album_url in albums:
tg.create_task(rip_album(Album.parse_url(album_url), auth_params, codec, config, device, force_save))
logger.info(f"Artist: {artist_info.data[0].attributes.name} finished ripping")
logger.info(f"Artist: {artist_info.data[0].attributes.name} finished ripping")

@ -3,24 +3,18 @@ from pathlib import Path
from src.config import Download
from src.metadata import SongMetadata
from src.types import Codec
from src.utils import ttml_convent_to_lrc, get_valid_filename
from src.models import PlaylistMeta
from src.utils import ttml_convent_to_lrc, get_song_name_and_dir_path, get_suffix
def save(song: bytes, codec: str, metadata: SongMetadata, config: Download):
song_name = get_valid_filename(config.songNameFormat.format(**metadata.model_dump()))
dir_path = Path(config.dirPathFormat.format(**metadata.model_dump()))
def save(song: bytes, codec: str, metadata: SongMetadata, config: Download, playlist: PlaylistMeta = None):
song_name, dir_path = get_song_name_and_dir_path(codec, config, metadata, playlist)
if not dir_path.exists() or not dir_path.is_dir():
os.makedirs(dir_path.absolute())
if codec == Codec.EC3 and not config.atmosConventToM4a:
song_path = dir_path / Path(song_name + ".ec3")
elif codec == Codec.AC3 and not config.atmosConventToM4a:
song_path = dir_path / Path(song_name + ".ac3")
else:
song_path = dir_path / Path(song_name + ".m4a")
song_path = dir_path / Path(song_name + get_suffix(codec, config.atmosConventToM4a))
with open(song_path.absolute(), "wb") as f:
f.write(song)
if config.saveCover:
if config.saveCover and not playlist:
cover_path = dir_path / Path(f"cover.{config.coverFormat}")
with open(cover_path.absolute(), "wb") as f:
f.write(metadata.cover)

@ -1,4 +1,5 @@
import asyncio
import sys
import time
from itertools import islice
from pathlib import Path
@ -9,6 +10,7 @@ from bs4 import BeautifulSoup
from src.config import Download
from src.exceptions import NotTimeSyncedLyricsException
from src.models import PlaylistMeta
from src.types import *
@ -103,15 +105,9 @@ def ttml_convent_to_lrc(ttml: str) -> str:
return "\n".join(lrc_lines)
def check_song_exists(metadata, config: Download, codec: str):
song_name = get_valid_filename(config.songNameFormat.format(**metadata.model_dump()))
dir_path = Path(config.dirPathFormat.format(**metadata.model_dump()))
if not config.atmosConventToM4a and codec == Codec.EC3:
return (Path(dir_path) / Path(song_name).with_suffix(".ec3")).exists()
elif not config.atmosConventToM4a and codec == Codec.AC3:
return (Path(dir_path) / Path(song_name).with_suffix(".ac3")).exists()
else:
return (Path(dir_path) / Path(song_name).with_suffix(".m4a")).exists()
def check_song_exists(metadata, config: Download, codec: str, playlist: PlaylistMeta = None):
song_name, dir_path = get_song_name_and_dir_path(codec, config, metadata, playlist)
return (Path(dir_path) / Path(song_name + get_suffix(codec, config.atmosConventToM4a))).exists()
def get_valid_filename(filename: str):
@ -129,3 +125,38 @@ def get_codec_from_codec_id(codec_id: str) -> str:
def get_song_id_from_m3u8(m3u8_url: str) -> str:
parsed_m3u8 = m3u8.load(m3u8_url)
return regex.search(r"_A(\d*)_", parsed_m3u8.playlists[0].uri)[1]
def if_raw_atmos(codec: str, save_raw_atmos: bool):
if (codec == Codec.EC3 or codec == Codec.AC3) and save_raw_atmos:
return True
return False
def get_suffix(codec: str, save_raw_atmos: bool):
if not save_raw_atmos and codec == Codec.EC3:
return ".ec3"
elif not save_raw_atmos and codec == Codec.AC3:
return ".ac3"
else:
return ".m4a"
def playlist_metadata_to_params(playlist: PlaylistMeta):
return {"playlistName": playlist.data[0].attributes.name,
"playlistCuratorName": playlist.data[0].attributes.curatorName}
def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist: PlaylistMeta = None):
if playlist:
song_name = config.playlistSongNameFormat.format(codec=codec, **metadata.model_dump(),
**playlist_metadata_to_params(playlist))
dir_path = Path(config.playlistDirPathFormat.format(codec=codec, **metadata.model_dump(),
**playlist_metadata_to_params(playlist)))
else:
song_name = config.songNameFormat.format(codec=codec, **metadata.model_dump())
dir_path = Path(config.dirPathFormat.format(codec=codec, **metadata.model_dump()))
if sys.platform == "win32":
song_name = get_valid_filename(song_name)
dir_path = Path(*[get_valid_filename(part) if ":\\" not in part else part for part in dir_path.parts])
return song_name, dir_path

Loading…
Cancel
Save