diff --git a/src/api.py b/src/api.py index a427595..40ffe09 100644 --- a/src/api.py +++ b/src/api.py @@ -1,5 +1,6 @@ import asyncio import logging +from io import BytesIO from ssl import SSLError from typing import Optional @@ -11,6 +12,7 @@ from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_ from src.models import * from src.models.song_data import Datum +from src.status import BaseStatus, StatusCode client: httpx.AsyncClient download_lock: asyncio.Semaphore @@ -78,9 +80,15 @@ async def get_token(): @retry(retry=retry_if_exception_type((httpx.HTTPError, SSLError, FileNotFoundError)), wait=wait_random_exponential(multiplier=1, max=60), stop=stop_after_attempt(retry_times), before_sleep=before_sleep_log(logger, logging.WARN)) -async def download_song(url: str) -> bytes: +async def download_song(url: str, status: BaseStatus) -> bytes: async with download_lock: - return (await client.get(url)).content + status.set_status(StatusCode.Downloading) + result = BytesIO() + async with client.stream("GET", url) as resp: + async for chunk in resp.aiter_bytes(): + result.write(chunk) + status.set_progress("download", resp.num_bytes_downloaded, int(resp.headers["Content-Length"])) + return result.getvalue() @alru_cache diff --git a/src/cmd.py b/src/cmd.py index c955840..11a7806 100644 --- a/src/cmd.py +++ b/src/cmd.py @@ -3,6 +3,7 @@ import asyncio import random import sys from asyncio import Task +from typing import Literal from loguru import logger from prompt_toolkit import PromptSession, print_formatted_text, ANSI @@ -12,6 +13,7 @@ from src.adb import Device from src.api import get_token, init_client_and_lock, upload_m3u8_to_api, get_song_info, get_real_url from src.config import Config from src.rip import rip_song, rip_album, rip_artist, rip_playlist +from src.status import LogStatus, BaseStatus from src.types import GlobalAuthParams from src.url import AppleMusicURL, URLType, Song from src.utils import get_song_id_from_m3u8 @@ -105,17 +107,25 @@ class NewInteractiveShell: self.anonymous_access_token) match url.type: case URLType.Song: + status = LogStatus(status_type=URLType.Song) task = self.loop.create_task( - rip_song(url, global_auth_param, codec, self.config, available_device, force_download)) + rip_song(url, global_auth_param, codec, self.config, available_device, status, + force_save=force_download)) case URLType.Album: - task = self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device, - force_download)) + status = LogStatus(status_type=URLType.Album) + task = self.loop.create_task( + rip_album(url, global_auth_param, codec, self.config, available_device, status, + force_save=force_download)) case URLType.Artist: - task = self.loop.create_task(rip_artist(url, global_auth_param, codec, self.config, available_device, - force_download, include)) + status = LogStatus(status_type=URLType.Artist) + task = self.loop.create_task( + rip_artist(url, global_auth_param, codec, self.config, available_device, status, + force_save=force_download, include_participate_in_works=include)) case URLType.Playlist: - task = self.loop.create_task(rip_playlist(url, global_auth_param, codec, self.config, available_device, - force_download)) + status = LogStatus(status_type=URLType.Playlist) + task = self.loop.create_task( + rip_playlist(url, global_auth_param, codec, self.config, available_device, status, + force_save=force_download)) case _: logger.error("Unsupported URLType") return diff --git a/src/decrypt.py b/src/decrypt.py index 233f258..42ebb5c 100644 --- a/src/decrypt.py +++ b/src/decrypt.py @@ -17,8 +17,9 @@ retry_count = {} @retry(retry=retry_if_exception_type(RetryableDecryptException), stop=stop_after_attempt(3), before_sleep=before_sleep_log(logger, logging.WARN)) @timeit -async def decrypt(info: SongInfo, keys: list[str], manifest: Datum, device: Device | HyperDecryptDevice) -> bytes: +async def decrypt(info: SongInfo, keys: list[str], manifest: Datum, device: Device | HyperDecryptDevice, status: BaseStatus) -> bytes: async with device.decryptLock: + status.set_status(StatusCode.Decrypting) if isinstance(device, HyperDecryptDevice): logger.info(f"Using hyperDecryptDevice {device.serial} to decrypt song: {manifest.attributes.artistName} - {manifest.attributes.name}") else: @@ -31,7 +32,9 @@ async def decrypt(info: SongInfo, keys: list[str], manifest: Datum, device: Devi raise RetryableDecryptException decrypted = [] last_index = 255 + now = 0 for sample in info.samples: + status.set_progress("decrypt", now, len(info.samples)) if last_index != sample.descIndex: if len(decrypted) != 0: writer.write(bytes([0, 0, 0, 0])) diff --git a/src/rip.py b/src/rip.py index cbf9cd0..194f71d 100644 --- a/src/rip.py +++ b/src/rip.py @@ -1,6 +1,7 @@ import asyncio import random import subprocess +from typing import Optional from loguru import logger @@ -14,6 +15,7 @@ from src.metadata import SongMetadata from src.models import PlaylistInfo from src.mp4 import extract_media, extract_song, encapsulate, write_metadata, fix_encapsulate, fix_esds_box from src.save import save +from src.status import BaseStatus, StatusCode, ErrorCode, WarningCode from src.types import GlobalAuthParams, Codec from src.url import Song, Album, URLType, Artist, Playlist from src.utils import check_song_exists, if_raw_atmos, playlist_write_song_index, get_codec_from_codec_id, timeit @@ -23,51 +25,49 @@ task_lock = asyncio.Semaphore(16) @logger.catch @timeit -async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, - force_save: bool = False, specified_m3u8: str = "", playlist: PlaylistInfo = None): +async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, status: BaseStatus, + force_save: bool = False, specified_m3u8: str = "", playlist: PlaylistInfo = None, return_result: bool = False) -> Optional[tuple[bytes, SongMetadata, str]]: async with task_lock: - logger.debug(f"Task of song id {song.id} was created") + status.set_param(song_id=song.id) + status.set_status(StatusCode.Processing) token = auth_params.anonymousAccessToken song_data = await get_song_info(song.id, token, song.storefront, config.region.language) song_metadata = SongMetadata.parse_from_song_data(song_data) + status.set_param(artist=song_metadata.artist, title=song_metadata.title, + song_storefront=song.storefront, storefront=auth_params.storefront) if playlist: song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id)) - logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") + status.set_status(StatusCode.Parsing) if not await exist_on_storefront_by_song_id(song.id, song.storefront, auth_params.storefront, - auth_params.anonymousAccessToken, config.region.language): - logger.error( - f"Unable to download song {song_metadata.artist} - {song_metadata.title}. " - f"This song does not exist in storefront {auth_params.storefront.upper()} " - f"and no device is available to decrypt it") + auth_params.anonymousAccessToken, config.region.language): + status.set_status(ErrorCode.NotExistInStorefront) return - if not force_save and check_song_exists(song_metadata, config.download, codec, playlist): - logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists") + if not force_save and check_song_exists(song_metadata, config.download, codec, playlist) and not return_result: + status.set_status(StatusCode.AlreadyExist) return await song_metadata.get_cover(config.download.coverFormat, config.download.coverSize) if song_data.attributes.hasTimeSyncedLyrics: if song.storefront.upper() != auth_params.storefront.upper(): - logger.warning(f"No account is available for getting lyrics of storefront {song.storefront.upper()}. " - f"Use storefront {auth_params.storefront.upper()} to get lyrics") + status.set_warning(WarningCode.NoAvailableAccountForLyrics) lyrics = await get_song_lyrics(song.id, auth_params.storefront, auth_params.accountAccessToken, auth_params.dsid, auth_params.accountToken, config.region.language) if lyrics: song_metadata.lyrics = lyrics else: - logger.warning(f"Unable to get lyrics of song: {song_metadata.artist} - {song_metadata.title}") + status.set_warning(WarningCode.UnableGetLyrics) if config.m3u8Api.enable and codec == Codec.ALAC and not specified_m3u8: m3u8_url = await get_m3u8_from_api(config.m3u8Api.endpoint, song.id, config.m3u8Api.enable) if m3u8_url: specified_m3u8 = m3u8_url logger.info(f"Use m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}") elif not m3u8_url and config.m3u8Api.force: - logger.error(f"Failed to get m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}") + status.set_error(ErrorCode.ForceModeM3U8NotExist) return if not song_data.attributes.extendedAssetUrls: - logger.error( - f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist") + status.set_error(ErrorCode.AudioNotExist) return if not specified_m3u8 and not song_data.attributes.extendedAssetUrls.enhancedHls: - logger.error(f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist") + status.set_error(ErrorCode.LosslessAudioNotExist) return if not specified_m3u8: device_m3u8 = await device.get_m3u8(song.id) @@ -83,89 +83,103 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config song_metadata, config.download.codecPriority, config.download.codecAlternative) - logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}") + status.set_param(codec=codec_id) codec = get_codec_from_codec_id(codec_id) - raw_song = await download_song(song_uri) + raw_song = await download_song(song_uri, status) song_info = await extract_song(raw_song, codec) if device.hyperDecryptDevices: if all([hyper_device.decryptLock.locked() for hyper_device in device.hyperDecryptDevices]): - decrypted_song = await decrypt(song_info, keys, song_data, random.choice(device.hyperDecryptDevices)) + decrypted_song = await decrypt(song_info, keys, song_data, random.choice(device.hyperDecryptDevices), status) else: for hyperDecryptDevice in device.hyperDecryptDevices: if not hyperDecryptDevice.decryptLock.locked(): - decrypted_song = await decrypt(song_info, keys, song_data, hyperDecryptDevice) + decrypted_song = await decrypt(song_info, keys, song_data, hyperDecryptDevice, status) break else: - decrypted_song = await decrypt(song_info, keys, song_data, device) + decrypted_song = await decrypt(song_info, keys, song_data, device, status) + status.set_status(StatusCode.Saving) song = await encapsulate(song_info, decrypted_song, config.download.atmosConventToM4a) if not if_raw_atmos(codec, config.download.atmosConventToM4a): - metadata_song = await write_metadata(song, song_metadata, config.metadata.embedMetadata, config.download.coverFormat) + metadata_song = await write_metadata(song, song_metadata, config.metadata.embedMetadata, + config.download.coverFormat) song = await fix_encapsulate(metadata_song) if codec == Codec.AAC or codec == Codec.AAC_DOWNMIX or codec == Codec.AAC_BINAURAL: song = await fix_esds_box(song_info.raw, song) - filename = await save(song, codec, song_metadata, config.download, playlist) - logger.info(f"Song {song_metadata.artist} - {song_metadata.title} saved!") - if config.download.afterDownloaded: - command = config.download.afterDownloaded.format(filename=filename) - logger.info(f"Executing command: {command}") - subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if return_result: + status.set_status(StatusCode.Done) + return song, song_metadata, codec + else: + filename = await save(song, codec, song_metadata, config.download, playlist) + status.set_status(StatusCode.Done) + if config.download.afterDownloaded: + command = config.download.afterDownloaded.format(filename=filename) + logger.info(f"Executing command: {command}") + subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) @logger.catch @timeit -async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, +async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, status: BaseStatus, force_save: bool = False): album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront, config.region.language) - logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}") - if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, auth_params.anonymousAccessToken, config.region.language): - logger.error(f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. " - f"This album does not exist in storefront {auth_params.storefront.upper()} " - f"and no device is available to decrypt it") + status.set_param(artist=album_info.data[0].attributes.artistName, title=album_info.data[0].attributes.name, + storefront=auth_params.storefront) + status.set_status(StatusCode.Processing) + if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, + auth_params.anonymousAccessToken, config.region.language): + status.set_error(ErrorCode.NotExistInStorefront) return async with asyncio.TaskGroup() as tg: for track in album_info.data[0].relationships.tracks.data: + song_status = status.new(URLType.Song) + status.children.append(song_status) song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song) - tg.create_task(rip_song(song, auth_params, codec, config, device, force_save=force_save)) - logger.info( - f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping") + tg.create_task(rip_song(song, auth_params, codec, config, device, song_status, force_save=force_save)) + status.set_status(StatusCode.Done) @logger.catch @timeit -async def rip_playlist(playlist: Playlist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, +async def rip_playlist(playlist: Playlist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, status: BaseStatus, force_save: bool = False): playlist_info = await get_playlist_info_and_tracks(playlist.id, auth_params.anonymousAccessToken, playlist.storefront, config.region.language) playlist_info = playlist_write_song_index(playlist_info) - logger.info( - f"Ripping Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name}") + status.set_param(artist=playlist_info.data[0].attributes.curatorName, title=playlist_info.data[0].attributes.name) + status.set_status(StatusCode.Processing) async with asyncio.TaskGroup() as tg: for track in playlist_info.data[0].relationships.tracks.data: + song_status = status.new(URLType.Song) + status.children.append(song_status) song = Song(id=track.id, storefront=playlist.storefront, url="", type=URLType.Song) tg.create_task( - rip_song(song, auth_params, codec, config, device, force_save=force_save, playlist=playlist_info)) - logger.info( - f"Playlist: {playlist_info.data[0].attributes.curatorName} - {playlist_info.data[0].attributes.name} finished ripping") + rip_song(song, auth_params, codec, config, device, song_status, force_save=force_save, playlist=playlist_info)) + status.set_status(StatusCode.Done) @logger.catch @timeit -async def rip_artist(artist: Artist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, +async def rip_artist(artist: Artist, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, status: BaseStatus, force_save: bool = False, include_participate_in_works: bool = False): artist_info = await get_artist_info(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language) - logger.info(f"Ripping Artist: {artist_info.data[0].attributes.name}") + status.set_param(artist=artist_info.data[0].attributes.name) + status.set_status(StatusCode.Processing) async with asyncio.TaskGroup() as tg: if include_participate_in_works: songs = await get_songs_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language) for song_url in songs: - tg.create_task(rip_song(Song.parse_url(song_url), auth_params, codec, config, device, force_save)) + song_status = status.new(URLType.Song) + status.children.append(song_status) + tg.create_task(rip_song(Song.parse_url(song_url), auth_params, codec, config, device, song_status, force_save=force_save)) else: albums = await get_albums_from_artist(artist.id, artist.storefront, auth_params.anonymousAccessToken, config.region.language) for album_url in albums: - tg.create_task(rip_album(Album.parse_url(album_url), auth_params, codec, config, device, force_save)) - logger.info(f"Artist: {artist_info.data[0].attributes.name} finished ripping") + album_status = status.new(URLType.Song) + status.children.append(album_status) + tg.create_task(rip_album(Album.parse_url(album_url), auth_params, codec, config, device, album_status, force_save=force_save)) + status.set_status(StatusCode.Done) diff --git a/src/status.py b/src/status.py new file mode 100644 index 0000000..bbb0a0f --- /dev/null +++ b/src/status.py @@ -0,0 +1,158 @@ +from copy import deepcopy +from typing import Optional, Any + +from loguru import logger + +from src.url import URLType + + +class StatusCode: + """ + For Song, available values are all. + For others, available values are Waiting, Processing, Done and Failed. + """ + Waiting = "WAITING" + Processing = "PROCESSING" + Parsing = "PARSING" + Downloading = "DOWNLOADING" + Decrypting = "DECRYPTING" + Saving = "SAVING" + Done = "Done" + AlreadyExist = "ALREADY_EXIST" + Failed = "FAILED" + + +class WarningCode: + NoAvailableAccountForLyrics = "NO_AVAILABLE_ACCOUNT_FOR_LYRICS" + UnableGetLyrics = "UNABLE_GET_LYRICS" + RetryableDecryptFailed = "RETRYABLE_DECRYPT_FAILED" + + +class ErrorCode: + NotExistInStorefront = "NOT_EXIST_IN_STOREFRONT" + ForceModeM3U8NotExist = "FORCE_MODE_M3U8_NOT_EXIST" + AudioNotExist = "AUDIO_NOT_EXIST" + LosslessAudioNotExist = "LOSSLESS_AUDIO_NOT_EXIST" + DecryptFailed = "DECRYPT_FAILED" + + +class BaseStatus: + _type: str + _current: str = StatusCode.Waiting + _status_params: dict[str, Any] = {} + _params: dict[str, Any] = {} + _warning: str = "" + _error: str = "" + children = [] + + def __init__(self, status_type: str): + self._type = status_type + + def new(self, status_type): + new_obj = deepcopy(self) + new_obj._type = status_type + new_obj._current = StatusCode + new_obj._status_params = {} + new_obj._params = {} + new_obj._warning = "" + new_obj._error = "" + new_obj.children = [] + return new_obj + + def running(self): + if self._error: + return False + if self._current == StatusCode.Waiting or self._current == StatusCode.Done or self._current == StatusCode.AlreadyExist: + return False + return True + + def set_status(self, status: str, **kwargs): + self._current = status + + def get_status(self) -> str: + return self._current + + def set_warning(self, warning: str, **kwargs): + self._warning = warning + + def get_warning(self): + return self._warning + + def set_error(self, error: str, **kwargs): + self._error = error + self._current = StatusCode.Failed + + def get_error(self): + return self._error + + def set_progress(self, key: str, now: int, total: int, **kwargs): + self._status_params[key] = {"now": now, "total": total} + + def get_progress(self, key: str) -> Optional[tuple[int, int]]: + if self._status_params.get(key): + return self._status_params[key]["now"], self._status_params[key]["total"] + return None + + def set_param(self, **kwargs): + for param in kwargs.items(): + self._params[param[0]] = param[1] + + +class LogStatus(BaseStatus): + def _get_song_name(self) -> str: + if self._params.get('title'): + return f"{self._params.get('artist')} - {self._params.get('title')}" + return self._params.get('artist') + + def set_status(self, status: str, **kwargs): + super().set_status(status, **kwargs) + match status: + case StatusCode.Waiting: + pass + case StatusCode.Processing: + if self._type == URLType.Song: + logger.debug(f"Task of {self._type} id {self._params.get('song_id')} was created") + else: + logger.info(f"Ripping {self._type}: {self._get_song_name()}") + case StatusCode.Parsing: + logger.info(f"Ripping {self._type}: {self._get_song_name()}") + case StatusCode.Downloading: + logger.info(f"Downloading {self._type}: {self._get_song_name()}") + case StatusCode.Decrypting: + logger.info(f"Decrypting {self._type}: {self._get_song_name()}") + case StatusCode.Saving: + pass + case StatusCode.Done: + logger.info( + f"{self._type.capitalize()} {self._get_song_name()} saved!") + case StatusCode.AlreadyExist: + logger.info( + f"{self._type.capitalize()}: {self._get_song_name()} already exists") + + def set_warning(self, warning: str, **kwargs): + super().set_warning(warning, **kwargs) + match warning: + case WarningCode.NoAvailableAccountForLyrics: + logger.warning(f"No account is available for getting lyrics of storefront {self._params.get('song_storefront').upper()}. " + f"Use storefront {self._params.get('storefront').upper()} to get lyrics") + case WarningCode.RetryableDecryptFailed: + logger.warning(f"Failed to decrypt song: {self._get_song_name()}, {kwargs['action']}") + case WarningCode.UnableGetLyrics: + logger.warning(f"Unable to get lyrics of song: {self._get_song_name()}") + + def set_error(self, error: str, **kwargs): + super().set_error(error, **kwargs) + match error: + case ErrorCode.AudioNotExist: + logger.error(f"Failed to download song: {self._get_song_name()}. Audio does not exist") + case ErrorCode.LosslessAudioNotExist: + logger.error(f"Failed to download song: {self._get_song_name()}. Lossless audio does not exist") + case ErrorCode.DecryptFailed: + logger.error(f"Failed to decrypt song: {self._get_song_name()}") + case ErrorCode.NotExistInStorefront: + logger.error( + f"Unable to download {self._type} {self._get_song_name()}. " + f"This {self._type} does not exist in storefront {self._params.get('storefront').upper()} " + f"and no device is available to decrypt it") + case ErrorCode.ForceModeM3U8NotExist: + logger.error(f"Failed to get m3u8 from API for song: {self._get_song_name()}") \ No newline at end of file