Compare commits

..

No commits in common. 'f17b86082689f56ab0f5676725c9767316ffb63f' and '9081672546c87e508b7c23de72b5dceec6fe20e5' have entirely different histories.

  1. 24
      config.example.toml
  2. 7
      src/config.py
  3. 20
      src/metadata.py
  4. 9
      src/mp4.py
  5. 32
      src/rip.py
  6. 22
      src/utils.py

@ -44,23 +44,10 @@ codecPriority = ["alac", "ec3", "ac3", "aac"]
# Encapsulate Atmos(ec-3/ac-3) as M4A and write the song metadata # Encapsulate Atmos(ec-3/ac-3) as M4A and write the song metadata
atmosConventToM4a = true atmosConventToM4a = true
# Follow the Python Format format (https://docs.python.org/3/library/string.html#formatstrings) # Follow the Python Format format (https://docs.python.org/3/library/string.html#formatstrings)
# Write the audio information to the songNameFormat and playlistSongNameFormat
# Only support alac codec
# Available values: bit_depth, sample_rate, sample_rate_kHz, codec
# This feature may slow down the speed of checking for existing songs
# For example:
# audioInfoFormat = " [{codec}][{bit_depth}bit][{sample_rate_kHz}kHz]"
# songNameFormat = "{disk}-{tracknum:02d} {title}{audio_info}"
# When transcribing audio with alac codec, the transcribed file name is:
# 1-01 名もなき何もかも [ALAC][16bit][44.1kHz]
# When transcribing audio with other codecs, the transcribed file name is:
# 1-01 名もなき何もかも
audioInfoFormat = ""
# Available values: # Available values:
# title, artist, album, album_artist, composer, # title, artist, album, album_artist, composer,
# genre, created, track, tracknum, disk, # genre, created, track, tracknum, disk,
# record_company, upc, isrc, copyright, codec, audio_info # record_company, upc, isrc, copyright, codec
# song_id, album_id
songNameFormat = "{disk}-{tracknum:02d} {title}" songNameFormat = "{disk}-{tracknum:02d} {title}"
# Ditto # Ditto
dirPathFormat = "downloads/{album_artist}/{album}" dirPathFormat = "downloads/{album_artist}/{album}"
@ -73,9 +60,8 @@ playlistDirPathFormat = "downloads/playlists/{playlistName}"
# Available values: # Available values:
# title, artist, album, album_artist, composer, # title, artist, album, album_artist, composer,
# genre, created, track, tracknum, disk, # genre, created, track, tracknum, disk,
# record_company, upc, isrc, copyright, audio_info # record_company, upc, isrc, copyright,
# playlistName, playlistCuratorName, playlistSongIndex, codec # playlistName, playlistCuratorName, playlistSongIndex, codec
# song_id, album_id
playlistSongNameFormat = "{playlistSongIndex:02d}. {artist} - {title}" playlistSongNameFormat = "{playlistSongIndex:02d}. {artist} - {title}"
# Save lyrics as .lrc file # Save lyrics as .lrc file
saveLyrics = true saveLyrics = true
@ -97,3 +83,9 @@ afterDownloaded = ""
embedMetadata = ["title", "artist", "album", "album_artist", "composer", embedMetadata = ["title", "artist", "album", "album_artist", "composer",
"genre", "created", "track", "tracknum", "disk", "lyrics", "cover", "copyright", "genre", "created", "track", "tracknum", "disk", "lyrics", "cover", "copyright",
"record_company", "upc", "isrc","rtng"] "record_company", "upc", "isrc","rtng"]
[mitm]
# The host proxy server listens on
host = "127.0.0.1"
# The port proxy server listens on
port = "11080"

@ -30,7 +30,6 @@ class Download(BaseModel):
codecAlternative: bool codecAlternative: bool
codecPriority: list[str] codecPriority: list[str]
atmosConventToM4a: bool atmosConventToM4a: bool
audioInfoFormat: str
songNameFormat: str songNameFormat: str
dirPathFormat: str dirPathFormat: str
playlistDirPathFormat: str playlistDirPathFormat: str
@ -46,12 +45,18 @@ class Metadata(BaseModel):
embedMetadata: list[str] embedMetadata: list[str]
class Mitm(BaseModel):
host: str
port: int
class Config(BaseModel): class Config(BaseModel):
region: Region region: Region
devices: list[Device] devices: list[Device]
m3u8Api: M3U8Api m3u8Api: M3U8Api
download: Download download: Download
metadata: Metadata metadata: Metadata
mitm: Mitm
@classmethod @classmethod
def load_from_config(cls, config_file: str = "config.toml"): def load_from_config(cls, config_file: str = "config.toml"):

@ -6,15 +6,10 @@ from src.api import get_cover
from src.models.song_data import Datum from src.models.song_data import Datum
from src.utils import ttml_convent_to_lrc from src.utils import ttml_convent_to_lrc
NOT_INCLUDED_FIELD = ["cover", "playlistIndex", "bit_depth", "sample_rate",
"sample_rate_kHz", "song_id", "album_id"]
class SongMetadata(BaseModel): class SongMetadata(BaseModel):
song_id: Optional[str] = None
title: Optional[str] = None title: Optional[str] = None
artist: Optional[str] = None artist: Optional[str] = None
album_id: Optional[str] = None
album_artist: Optional[str] = None album_artist: Optional[str] = None
album: Optional[str] = None album: Optional[str] = None
composer: Optional[str] = None composer: Optional[str] = None
@ -31,9 +26,6 @@ class SongMetadata(BaseModel):
upc: Optional[str] = None upc: Optional[str] = None
isrc: Optional[str] = None isrc: Optional[str] = None
playlistIndex: Optional[int] = None playlistIndex: Optional[int] = None
bit_depth: Optional[int] = None
sample_rate: Optional[int] = None
sample_rate_kHz: Optional[str] = None
def to_itags_params(self, embed_metadata: list[str]): def to_itags_params(self, embed_metadata: list[str]):
tags = [] tags = []
@ -41,7 +33,9 @@ class SongMetadata(BaseModel):
if not value: if not value:
continue continue
if key in embed_metadata and value: if key in embed_metadata and value:
if key in NOT_INCLUDED_FIELD: if "playlist" in key:
continue
if key == "cover":
continue continue
if key == "lyrics": if key == "lyrics":
lrc = ttml_convent_to_lrc(value) lrc = ttml_convent_to_lrc(value)
@ -68,8 +62,7 @@ class SongMetadata(BaseModel):
record_company=song_data.relationships.albums.data[0].attributes.recordLabel, record_company=song_data.relationships.albums.data[0].attributes.recordLabel,
upc=song_data.relationships.albums.data[0].attributes.upc, upc=song_data.relationships.albums.data[0].attributes.upc,
isrc=song_data.attributes.isrc, isrc=song_data.attributes.isrc,
rtng=1 if song_data.attributes.contentRating and song_data.attributes.contentRating == 'explicit' else 0, rtng=1 if song_data.attributes.contentRating and song_data.attributes.contentRating == 'explicit' else 0
song_id=song_data.id, album_id=song_data.relationships.albums.data[0].id
) )
def set_lyrics(self, lyrics: str): def set_lyrics(self, lyrics: str):
@ -80,8 +73,3 @@ class SongMetadata(BaseModel):
def set_playlist_index(self, index: int): def set_playlist_index(self, index: int):
self.playlistIndex = index self.playlistIndex = index
def set_bit_depth_and_sample_rate(self, bit_depth: int, sample_rate: int):
self.bit_depth = bit_depth
self.sample_rate = sample_rate
self.sample_rate_kHz = str(sample_rate / 1000)

@ -33,7 +33,7 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata, async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str], str, Optional[int], Optional[int]]: codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str], str]:
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url) parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
specifyPlaylist = find_best_codec(parsed_m3u8, codec) specifyPlaylist = find_best_codec(parsed_m3u8, codec)
if not specifyPlaylist and alternative_codec: if not specifyPlaylist and alternative_codec:
@ -65,12 +65,7 @@ async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
for key in skds: for key in skds:
if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault): if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault):
keys.append(key) keys.append(key)
if codec == Codec.ALAC: return stream.segment_map[0].absolute_uri, keys, selected_codec
sample_rate, bit_depth = specifyPlaylist.media[0].extras.values()
sample_rate, bit_depth = int(sample_rate), int(bit_depth)
else:
sample_rate, bit_depth = None, None
return stream.segment_map[0].absolute_uri, keys, selected_codec, bit_depth, sample_rate
async def extract_song(raw_song: bytes, codec: str) -> SongInfo: async def extract_song(raw_song: bytes, codec: str) -> SongInfo:

@ -34,7 +34,7 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id)) song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id))
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not await exist_on_storefront_by_song_id(song.id, song.storefront, auth_params.storefront, if not await exist_on_storefront_by_song_id(song.id, song.storefront, auth_params.storefront,
auth_params.anonymousAccessToken, config.region.language): auth_params.anonymousAccessToken, config.region.language):
logger.error( logger.error(
f"Unable to download song {song_metadata.artist} - {song_metadata.title}. " f"Unable to download song {song_metadata.artist} - {song_metadata.title}. "
f"This song does not exist in storefront {auth_params.storefront.upper()} " f"This song does not exist in storefront {auth_params.storefront.upper()} "
@ -67,8 +67,7 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist") f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist")
return return
if not specified_m3u8 and not song_data.attributes.extendedAssetUrls.enhancedHls: if not specified_m3u8 and not song_data.attributes.extendedAssetUrls.enhancedHls:
logger.error( logger.error(f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist")
f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist")
return return
if not specified_m3u8 and config.download.getM3u8FromDevice: if not specified_m3u8 and config.download.getM3u8FromDevice:
device_m3u8 = await device.get_m3u8(song.id) device_m3u8 = await device.get_m3u8(song.id)
@ -76,17 +75,14 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
specified_m3u8 = device_m3u8 specified_m3u8 = device_m3u8
logger.info(f"Use m3u8 from device for song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Use m3u8 from device for song: {song_metadata.artist} - {song_metadata.title}")
if specified_m3u8: if specified_m3u8:
song_uri, keys, codec_id, bit_depth, sample_rate = await extract_media( song_uri, keys, codec_id = await extract_media(specified_m3u8, codec, song_metadata,
specified_m3u8, codec, song_metadata, config.download.codecPriority, config.download.codecAlternative) config.download.codecPriority,
config.download.codecAlternative)
else: else:
song_uri, keys, codec_id, bit_depth, sample_rate = await extract_media( song_uri, keys, codec_id = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec,
song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata, song_metadata,
config.download.codecPriority, config.download.codecAlternative) config.download.codecPriority,
if all([bool(bit_depth), bool(sample_rate)]): config.download.codecAlternative)
song_metadata.set_bit_depth_and_sample_rate(bit_depth, sample_rate)
if not force_save and check_song_exists(song_metadata, config.download, codec, playlist):
logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists")
return
logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}")
codec = get_codec_from_codec_id(codec_id) codec = get_codec_from_codec_id(codec_id)
raw_song = await download_song(song_uri) raw_song = await download_song(song_uri)
@ -123,12 +119,10 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con
album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront, album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront,
config.region.language) config.region.language)
logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}") logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}")
if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, auth_params.anonymousAccessToken, config.region.language):
auth_params.anonymousAccessToken, config.region.language): logger.error(f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. "
logger.error( f"This album does not exist in storefront {auth_params.storefront.upper()} "
f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. " f"and no device is available to decrypt it")
f"This album does not exist in storefront {auth_params.storefront.upper()} "
f"and no device is available to decrypt it")
return return
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
for track in album_info.data[0].relationships.tracks.data: for track in album_info.data[0].relationships.tracks.data:

@ -113,10 +113,6 @@ def get_valid_filename(filename: str):
return "".join(i for i in filename if i not in ["<", ">", ":", "\"", "/", "\\", "|", "?", "*"]) return "".join(i for i in filename if i not in ["<", ">", ":", "\"", "/", "\\", "|", "?", "*"])
def get_valid_dir_name(dirname: str):
return regex.sub(r"\.+$", "", get_valid_filename(dirname))
def get_codec_from_codec_id(codec_id: str) -> str: def get_codec_from_codec_id(codec_id: str) -> str:
codecs = [Codec.AC3, Codec.EC3, Codec.AAC, Codec.ALAC, Codec.AAC_BINAURAL, Codec.AAC_DOWNMIX] codecs = [Codec.AC3, Codec.EC3, Codec.AAC, Codec.ALAC, Codec.AAC_BINAURAL, Codec.AAC_DOWNMIX]
for codec in codecs: for codec in codecs:
@ -150,14 +146,6 @@ def playlist_metadata_to_params(playlist: PlaylistInfo):
"playlistCuratorName": playlist.data[0].attributes.curatorName} "playlistCuratorName": playlist.data[0].attributes.curatorName}
def get_audio_info_str(metadata, codec: str, config: Download):
if all([bool(metadata.bit_depth), bool(metadata.sample_rate), bool(metadata.sample_rate_kHz)]):
return config.audioInfoFormat.format(bit_depth=metadata.bit_depth, sample_rate=metadata.sample_rate,
sample_rate_kHz=metadata.sample_rate_kHz, codec=codec)
else:
return ""
def get_path_safe_dict(param: dict): def get_path_safe_dict(param: dict):
new_param = deepcopy(param) new_param = deepcopy(param)
for key, val in new_param.items(): for key, val in new_param.items():
@ -171,17 +159,17 @@ def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist:
safe_meta = get_path_safe_dict(metadata.model_dump()) safe_meta = get_path_safe_dict(metadata.model_dump())
safe_pl_meta = get_path_safe_dict(playlist_metadata_to_params(playlist)) safe_pl_meta = get_path_safe_dict(playlist_metadata_to_params(playlist))
song_name = config.playlistSongNameFormat.format(codec=codec, playlistSongIndex=metadata.playlistIndex, song_name = config.playlistSongNameFormat.format(codec=codec, playlistSongIndex=metadata.playlistIndex,
audio_info=get_audio_info_str(metadata, codec, config),
**safe_meta) **safe_meta)
dir_path = Path(config.playlistDirPathFormat.format(codec=codec, **safe_meta, **safe_pl_meta)) dir_path = Path(config.playlistDirPathFormat.format(codec=codec,
**safe_meta,
**safe_pl_meta))
else: else:
safe_meta = get_path_safe_dict(metadata.model_dump()) safe_meta = get_path_safe_dict(metadata.model_dump())
song_name = config.songNameFormat.format(codec=codec, audio_info=get_audio_info_str(metadata, codec, config), song_name = config.songNameFormat.format(codec=codec, **safe_meta)
**safe_meta)
dir_path = Path(config.dirPathFormat.format(codec=codec, **safe_meta)) dir_path = Path(config.dirPathFormat.format(codec=codec, **safe_meta))
if sys.platform == "win32": if sys.platform == "win32":
song_name = get_valid_filename(song_name) song_name = get_valid_filename(song_name)
dir_path = Path(*[get_valid_dir_name(part) if ":\\" not in part else part for part in dir_path.parts]) dir_path = Path(*[get_valid_filename(part) if ":\\" not in part else part for part in dir_path.parts])
return song_name, dir_path return song_name, dir_path

Loading…
Cancel
Save