From 0d9810946802a343d5a3fc641fde2ade6e2f91cd Mon Sep 17 00:00:00 2001 From: WorldObservationLog Date: Wed, 29 May 2024 16:40:29 +0800 Subject: [PATCH] feat: support audio_info format --- config.example.toml | 16 ++++++++++++++-- src/config.py | 1 + src/metadata.py | 14 +++++++++++--- src/mp4.py | 9 +++++++-- src/rip.py | 32 +++++++++++++++++++------------- src/utils.py | 12 ++++++++++-- 6 files changed, 62 insertions(+), 22 deletions(-) diff --git a/config.example.toml b/config.example.toml index c8f8991..b0fd969 100644 --- a/config.example.toml +++ b/config.example.toml @@ -44,10 +44,22 @@ codecPriority = ["alac", "ec3", "ac3", "aac"] # Encapsulate Atmos(ec-3/ac-3) as M4A and write the song metadata atmosConventToM4a = true # Follow the Python Format format (https://docs.python.org/3/library/string.html#formatstrings) +# Write the audio information to the songNameFormat and playlistSongNameFormat +# Only support alac codec +# Available values: bit_depth, sample_rate, sample_rate_kHz, codec +# This feature may slow down the speed of checking for existing songs +# For example: +# audioInfoFormat = " [{codec}][{bit_depth}bit][{sample_rate_kHz}kHz]" +# songNameFormat = "{disk}-{tracknum:02d} {title}{audio_info}" +# When transcribing audio with alac codec, the transcribed file name is: +# 1-01 名もなき何もかも [ALAC][16bit][44.1kHz] +# When transcribing audio with other codecs, the transcribed file name is: +# 1-01 名もなき何もかも +audioInfoFormat = "" # Available values: # title, artist, album, album_artist, composer, # genre, created, track, tracknum, disk, -# record_company, upc, isrc, copyright, codec +# record_company, upc, isrc, copyright, codec, audio_info songNameFormat = "{disk}-{tracknum:02d} {title}" # Ditto dirPathFormat = "downloads/{album_artist}/{album}" @@ -60,7 +72,7 @@ playlistDirPathFormat = "downloads/playlists/{playlistName}" # Available values: # title, artist, album, album_artist, composer, # genre, created, track, tracknum, disk, -# record_company, upc, isrc, copyright, +# record_company, upc, isrc, copyright, audio_info # playlistName, playlistCuratorName, playlistSongIndex, codec playlistSongNameFormat = "{playlistSongIndex:02d}. {artist} - {title}" # Save lyrics as .lrc file diff --git a/src/config.py b/src/config.py index f640d0d..22f67f5 100644 --- a/src/config.py +++ b/src/config.py @@ -30,6 +30,7 @@ class Download(BaseModel): codecAlternative: bool codecPriority: list[str] atmosConventToM4a: bool + audioInfoFormat: str songNameFormat: str dirPathFormat: str playlistDirPathFormat: str diff --git a/src/metadata.py b/src/metadata.py index 16b3770..0b9bac9 100644 --- a/src/metadata.py +++ b/src/metadata.py @@ -6,6 +6,8 @@ from src.api import get_cover from src.models.song_data import Datum from src.utils import ttml_convent_to_lrc +NOT_INCLUDED_FIELD = ["cover", "playlistIndex", "bit_depth", "sample_rate", "sample_rate_kHz"] + class SongMetadata(BaseModel): title: Optional[str] = None @@ -26,6 +28,9 @@ class SongMetadata(BaseModel): upc: Optional[str] = None isrc: Optional[str] = None playlistIndex: Optional[int] = None + bit_depth: Optional[int] = None + sample_rate: Optional[int] = None + sample_rate_kHz: Optional[str] = None def to_itags_params(self, embed_metadata: list[str]): tags = [] @@ -33,9 +38,7 @@ class SongMetadata(BaseModel): if not value: continue if key in embed_metadata and value: - if "playlist" in key: - continue - if key == "cover": + if key in NOT_INCLUDED_FIELD: continue if key == "lyrics": lrc = ttml_convent_to_lrc(value) @@ -73,3 +76,8 @@ class SongMetadata(BaseModel): def set_playlist_index(self, index: int): self.playlistIndex = index + + def set_bit_depth_and_sample_rate(self, bit_depth: int, sample_rate: int): + self.bit_depth = bit_depth + self.sample_rate = sample_rate + self.sample_rate_kHz = str(sample_rate / 1000) diff --git a/src/mp4.py b/src/mp4.py index 3796443..2f08feb 100644 --- a/src/mp4.py +++ b/src/mp4.py @@ -33,7 +33,7 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]: async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata, - codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str], str]: + codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str], str, Optional[int], Optional[int]]: parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url) specifyPlaylist = find_best_codec(parsed_m3u8, codec) if not specifyPlaylist and alternative_codec: @@ -65,7 +65,12 @@ async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata, for key in skds: if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault): keys.append(key) - return stream.segment_map[0].absolute_uri, keys, selected_codec + if codec == Codec.ALAC: + sample_rate, bit_depth = specifyPlaylist.media[0].extras.values() + sample_rate, bit_depth = int(sample_rate), int(bit_depth) + else: + sample_rate, bit_depth = None, None + return stream.segment_map[0].absolute_uri, keys, selected_codec, bit_depth, sample_rate async def extract_song(raw_song: bytes, codec: str) -> SongInfo: diff --git a/src/rip.py b/src/rip.py index 0fa052c..a1110e2 100644 --- a/src/rip.py +++ b/src/rip.py @@ -34,7 +34,7 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id)) logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") if not await exist_on_storefront_by_song_id(song.id, song.storefront, auth_params.storefront, - auth_params.anonymousAccessToken, config.region.language): + auth_params.anonymousAccessToken, config.region.language): logger.error( f"Unable to download song {song_metadata.artist} - {song_metadata.title}. " f"This song does not exist in storefront {auth_params.storefront.upper()} " @@ -67,7 +67,8 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist") return if not specified_m3u8 and not song_data.attributes.extendedAssetUrls.enhancedHls: - logger.error(f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist") + logger.error( + f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist") return if not specified_m3u8 and config.download.getM3u8FromDevice: device_m3u8 = await device.get_m3u8(song.id) @@ -75,14 +76,17 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config specified_m3u8 = device_m3u8 logger.info(f"Use m3u8 from device for song: {song_metadata.artist} - {song_metadata.title}") if specified_m3u8: - song_uri, keys, codec_id = await extract_media(specified_m3u8, codec, song_metadata, - config.download.codecPriority, - config.download.codecAlternative) + song_uri, keys, codec_id, bit_depth, sample_rate = await extract_media( + specified_m3u8, codec, song_metadata, config.download.codecPriority, config.download.codecAlternative) else: - song_uri, keys, codec_id = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec, - song_metadata, - config.download.codecPriority, - config.download.codecAlternative) + song_uri, keys, codec_id, bit_depth, sample_rate = await extract_media( + song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata, + config.download.codecPriority, config.download.codecAlternative) + if all([bool(bit_depth), bool(sample_rate)]): + song_metadata.set_bit_depth_and_sample_rate(bit_depth, sample_rate) + if not force_save and check_song_exists(song_metadata, config.download, codec, playlist): + logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists") + return logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}") codec = get_codec_from_codec_id(codec_id) raw_song = await download_song(song_uri) @@ -119,10 +123,12 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront, config.region.language) logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}") - if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, auth_params.anonymousAccessToken, config.region.language): - logger.error(f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. " - f"This album does not exist in storefront {auth_params.storefront.upper()} " - f"and no device is available to decrypt it") + if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, + auth_params.anonymousAccessToken, config.region.language): + logger.error( + f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. " + f"This album does not exist in storefront {auth_params.storefront.upper()} " + f"and no device is available to decrypt it") return async with asyncio.TaskGroup() as tg: for track in album_info.data[0].relationships.tracks.data: diff --git a/src/utils.py b/src/utils.py index 5d86a10..e9ae33a 100644 --- a/src/utils.py +++ b/src/utils.py @@ -146,6 +146,14 @@ def playlist_metadata_to_params(playlist: PlaylistInfo): "playlistCuratorName": playlist.data[0].attributes.curatorName} +def get_audio_info_str(metadata, codec: str, config: Download): + if all([bool(metadata.bit_depth), bool(metadata.sample_rate), bool(metadata.sample_rate_kHz)]): + return config.audioInfoFormat.format(bit_depth=metadata.bit_depth, sample_rate=metadata.sample_rate, + sample_rate_kHz=metadata.sample_rate_kHz, codec=codec) + else: + return "" + + def get_path_safe_dict(param: dict): new_param = deepcopy(param) for key, val in new_param.items(): @@ -159,13 +167,13 @@ def get_song_name_and_dir_path(codec: str, config: Download, metadata, playlist: safe_meta = get_path_safe_dict(metadata.model_dump()) safe_pl_meta = get_path_safe_dict(playlist_metadata_to_params(playlist)) song_name = config.playlistSongNameFormat.format(codec=codec, playlistSongIndex=metadata.playlistIndex, - **safe_meta) + audio_info=get_audio_info_str(metadata, codec, config), **safe_meta) dir_path = Path(config.playlistDirPathFormat.format(codec=codec, **safe_meta, **safe_pl_meta)) else: safe_meta = get_path_safe_dict(metadata.model_dump()) - song_name = config.songNameFormat.format(codec=codec, **safe_meta) + song_name = config.songNameFormat.format(codec=codec, audio_info=get_audio_info_str(metadata, codec, config), **safe_meta) dir_path = Path(config.dirPathFormat.format(codec=codec, **safe_meta)) if sys.platform == "win32": song_name = get_valid_filename(song_name)