feat: check availability to decrypt

pull/8/head
WorldObservationLog 4 months ago
parent 5885262fe8
commit daa7bf2ae9
  1. 45
      src/api.py
  2. 17
      src/rip.py

@ -256,3 +256,48 @@ async def download_m3u8(m3u8_url: str) -> str:
async def get_real_url(url: str): async def get_real_url(url: str):
req = await client.get(url, follow_redirects=True, headers={"User-Agent": user_agent_browser}) req = await client.get(url, follow_redirects=True, headers={"User-Agent": user_agent_browser})
return str(req.url) return str(req.url)
@alru_cache
@retry(retry=retry_if_exception_type(httpx.HTTPError), stop=stop_after_attempt(5))
async def get_album_by_upc(upc: str, storefront: str, token: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/albums",
params={"filter[upc]": upc},
headers={"Authorization": f"Bearer {token}", "Origin": "https://music.apple.com"})
resp = req.json()
try:
if resp["data"]:
return req.json()
else:
return None
except KeyError:
logger.debug(f"UPC: {upc}, Storefront: {storefront}")
return None
@alru_cache
@retry(retry=retry_if_exception_type(
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def exist_on_storefront_by_song_id(song_id: str, storefront: str, check_storefront: str, token: str, lang: str):
if storefront == check_storefront:
return True
song = await get_song_info(song_id, token, storefront, lang)
album_id = song.relationships.albums.data[0].id
album = await get_album_info(album_id, token, storefront, lang)
upc = album.data[0].attributes.upc
upc_result = await get_album_by_upc(upc, check_storefront, token)
return bool(upc_result)
@alru_cache
@retry(retry=retry_if_exception_type(
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def exist_on_storefront_by_album_id(album_id: str, storefront: str, check_storefront: str, token: str, lang: str):
album = await get_album_info(album_id, token, storefront, lang)
upc = album.data[0].attributes.upc
upc_result = await get_album_by_upc(upc, check_storefront, token)
return bool(upc_result)

@ -5,8 +5,9 @@ from loguru import logger
from src.api import (get_song_info, get_song_lyrics, get_album_info, download_song, from src.api import (get_song_info, get_song_lyrics, get_album_info, download_song,
get_m3u8_from_api, get_artist_info, get_songs_from_artist, get_albums_from_artist, get_m3u8_from_api, get_artist_info, get_songs_from_artist, get_albums_from_artist,
get_playlist_info_and_tracks) get_playlist_info_and_tracks, exist_on_storefront_by_album_id, exist_on_storefront_by_song_id)
from src.config import Config, Device from src.config import Config
from src.adb import Device
from src.decrypt import decrypt from src.decrypt import decrypt
from src.metadata import SongMetadata from src.metadata import SongMetadata
from src.models import PlaylistInfo from src.models import PlaylistInfo
@ -30,6 +31,13 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
if playlist: if playlist:
song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id)) song_metadata.set_playlist_index(playlist.songIdIndexMapping.get(song.id))
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not await exist_on_storefront_by_song_id(song.id, song.storefront, auth_params.storefront,
auth_params.anonymousAccessToken, config.region.language):
logger.error(
f"Unable to download song {song_metadata.artist} - {song_metadata.title}. "
f"This song does not exist in storefront {auth_params.storefront.upper()} "
f"and no device is available to decrypt it")
return
if not force_save and check_song_exists(song_metadata, config.download, codec, playlist): if not force_save and check_song_exists(song_metadata, config.download, codec, playlist):
logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists") logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists")
return return
@ -77,6 +85,11 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con
album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront, album_info = await get_album_info(album.id, auth_params.anonymousAccessToken, album.storefront,
config.region.language) config.region.language)
logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}") logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}")
if not await exist_on_storefront_by_album_id(album.id, album.storefront, auth_params.storefront, auth_params.anonymousAccessToken, config.region.language):
logger.error(f"Unable to download album {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}. "
f"This album does not exist in storefront {auth_params.storefront.upper()} "
f"and no device is available to decrypt it")
return
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
for track in album_info.data[0].relationships.tracks.data: for track in album_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song) song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song)

Loading…
Cancel
Save