diff --git a/pyproject.toml b/pyproject.toml index 92c76d9..9ef9be1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ async-lru = "^2.0.4" winloop = {version = "^0.1.3", platform = "win32"} uvloop = [{version = "^0.19.0", platform = "linux"}, {version = "^0.19.0", platform = "darwin"}] +prettytable = "^3.10.0" [build-system] requires = ["poetry-core"] diff --git a/src/cmd.py b/src/cmd.py index c955840..387e476 100644 --- a/src/cmd.py +++ b/src/cmd.py @@ -5,12 +5,15 @@ import sys from asyncio import Task from loguru import logger +from prettytable import PrettyTable from prompt_toolkit import PromptSession, print_formatted_text, ANSI from prompt_toolkit.patch_stdout import patch_stdout from src.adb import Device -from src.api import get_token, init_client_and_lock, upload_m3u8_to_api, get_song_info, get_real_url +from src.api import get_token, init_client_and_lock, get_real_url, get_album_info from src.config import Config +from src.exceptions import CodecNotFoundException +from src.quality import get_available_song_audio_quality from src.rip import rip_song, rip_album, rip_artist, rip_playlist from src.types import GlobalAuthParams from src.url import AppleMusicURL, URLType, Song @@ -53,6 +56,9 @@ class NewInteractiveShell: choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"], default="alac") m3u8_parser.add_argument("-f", "--force", default=False, action="store_true") + m3u8_parser.add_argument("-q", "--quality", default="", dest="quality") + quality_parser = subparser.add_parser("quality") + quality_parser.add_argument("url", type=str) subparser.add_parser("exit") logger.remove() @@ -88,6 +94,8 @@ class NewInteractiveShell: await self.do_m3u8(args.url, args.codec, args.force) case "download-from-file" | "dlf": await self.do_download_from_file(args.file, args.codec, args.force) + case "quality": + await self.do_quality(args.url) case "exit": self.loop.stop() sys.exit() @@ -141,6 +149,55 @@ class NewInteractiveShell: self.tasks.append(task) task.add_done_callback(self.tasks.remove) + async def do_quality(self, raw_url: str): + url = AppleMusicURL.parse_url(raw_url) + if not url: + real_url = await get_real_url(raw_url) + url = AppleMusicURL.parse_url(real_url) + if not url: + logger.error("Illegal URL!") + return + logger.info(f"Getting data for {url.type} id {url.id}") + available_device = await self._get_available_device(url.storefront) + global_auth_param = GlobalAuthParams.from_auth_params_and_token(available_device.get_auth_params(), + self.anonymous_access_token) + match url.type: + case URLType.Song: + try: + song_metadata, audio_qualities = await get_available_song_audio_quality(url, self.config, + global_auth_param, + available_device) + except CodecNotFoundException: + return + table = PrettyTable( + field_names=["Codec ID", "Codec", "Bitrate", "Average Bitrate", "Channels", "Sample Rate", + "Bit Depth"]) + audio_qualities.sort(key=lambda x: x.bitrate, reverse=True) + table.add_rows([list(audio_quality.model_dump().values()) for audio_quality in audio_qualities]) + print_formatted_text( + f"Available audio qualities for song: {song_metadata.artist} - {song_metadata.title}:") + print_formatted_text(table) + case URLType.Album: + album_info = await get_album_info(url.id, global_auth_param.anonymousAccessToken, url.storefront, + self.config.region.language) + for track in album_info.data[0].relationships.tracks.data: + song = Song(id=track.id, storefront=url.storefront, url="", type=URLType.Song) + try: + song_metadata, audio_qualities = await get_available_song_audio_quality(song, self.config, + global_auth_param, + available_device) + except CodecNotFoundException: + return + table = PrettyTable( + field_names=["Codec ID", "Codec", "Bitrate", "Average Bitrate", "Channels", "Sample Rate", + "Bit Depth"]) + table.add_rows([list(audio_quality.model_dump().values()) for audio_quality in audio_qualities]) + print_formatted_text( + f"Available audio qualities for song: {song_metadata.artist} - {song_metadata.title}:") + print_formatted_text(table) + case _: + logger.error("Unsupported link!") + async def _get_available_device(self, storefront: str): devices = self.storefront_device_mapping.get(storefront) if not devices: diff --git a/src/quality.py b/src/quality.py new file mode 100644 index 0000000..6e6d320 --- /dev/null +++ b/src/quality.py @@ -0,0 +1,69 @@ +from typing import Optional + +import m3u8 +from loguru import logger +from pydantic import BaseModel + +from src.api import get_song_info, get_m3u8_from_api, download_m3u8 +from src.config import Config, Device +from src.exceptions import CodecNotFoundException +from src.metadata import SongMetadata +from src.types import GlobalAuthParams +from src.url import Song +from src.utils import get_codec_from_codec_id + + +async def get_available_audio_quality(m3u8_url: str): + parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url) + result = [] + for playlist in parsed_m3u8.playlists: + if get_codec_from_codec_id(playlist.stream_info.audio): + result.append(AudioQuality(codec_id=playlist.stream_info.audio, + codec=get_codec_from_codec_id(playlist.stream_info.audio), + bitrate=playlist.stream_info.bandwidth, + average_bitrate=playlist.stream_info.average_bandwidth, + channels=playlist.media[0].channels, + sample_rate=playlist.media[0].extras.get("sample_rate", None), + bit_depth=playlist.media[0].extras.get("bit_depth", None))) + return result + + +class AudioQuality(BaseModel): + codec_id: str + codec: str + bitrate: int + average_bitrate: int + channels: str + sample_rate: Optional[int] = None + bit_depth: Optional[int] = None + + +async def get_available_song_audio_quality(song: Song, config: Config, auth_params: GlobalAuthParams, + device: Device) -> tuple[SongMetadata, list[AudioQuality]]: + specified_m3u8 = None + token = auth_params.anonymousAccessToken + song_data = await get_song_info(song.id, token, song.storefront, config.region.language) + song_metadata = SongMetadata.parse_from_song_data(song_data) + if config.m3u8Api.enable: + m3u8_url = await get_m3u8_from_api(config.m3u8Api.endpoint, song.id, config.m3u8Api.enable) + if m3u8_url: + specified_m3u8 = m3u8_url + logger.info(f"Use m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}") + if not song_data.attributes.extendedAssetUrls: + logger.error( + f"Failed to get audio quality fo song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist") + raise CodecNotFoundException + if not song_data.attributes.extendedAssetUrls.enhancedHls: + logger.error( + f"Failed to get audio quality for song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist") + raise CodecNotFoundException + if not specified_m3u8 and config.download.getM3u8FromDevice: + device_m3u8 = await device.get_m3u8(song.id) + if device_m3u8: + specified_m3u8 = device_m3u8 + logger.info(f"Use m3u8 from device for song: {song_metadata.artist} - {song_metadata.title}") + if specified_m3u8: + audio_qualities = await get_available_audio_quality(specified_m3u8) + else: + audio_qualities = await get_available_audio_quality(song_data.attributes.extendedAssetUrls.enhancedHls) + return song_metadata, audio_qualities