feat: quality command

master
WorldObservationLog 5 months ago
parent f06a7c2530
commit def4436499
  1. 1
      pyproject.toml
  2. 59
      src/cmd.py
  3. 69
      src/quality.py

@ -25,6 +25,7 @@ async-lru = "^2.0.4"
winloop = {version = "^0.1.3", platform = "win32"} winloop = {version = "^0.1.3", platform = "win32"}
uvloop = [{version = "^0.19.0", platform = "linux"}, uvloop = [{version = "^0.19.0", platform = "linux"},
{version = "^0.19.0", platform = "darwin"}] {version = "^0.19.0", platform = "darwin"}]
prettytable = "^3.10.0"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

@ -5,12 +5,15 @@ import sys
from asyncio import Task from asyncio import Task
from loguru import logger from loguru import logger
from prettytable import PrettyTable
from prompt_toolkit import PromptSession, print_formatted_text, ANSI from prompt_toolkit import PromptSession, print_formatted_text, ANSI
from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.patch_stdout import patch_stdout
from src.adb import Device from src.adb import Device
from src.api import get_token, init_client_and_lock, upload_m3u8_to_api, get_song_info, get_real_url from src.api import get_token, init_client_and_lock, get_real_url, get_album_info
from src.config import Config from src.config import Config
from src.exceptions import CodecNotFoundException
from src.quality import get_available_song_audio_quality
from src.rip import rip_song, rip_album, rip_artist, rip_playlist from src.rip import rip_song, rip_album, rip_artist, rip_playlist
from src.types import GlobalAuthParams from src.types import GlobalAuthParams
from src.url import AppleMusicURL, URLType, Song from src.url import AppleMusicURL, URLType, Song
@ -53,6 +56,9 @@ class NewInteractiveShell:
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"], choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"],
default="alac") default="alac")
m3u8_parser.add_argument("-f", "--force", default=False, action="store_true") m3u8_parser.add_argument("-f", "--force", default=False, action="store_true")
m3u8_parser.add_argument("-q", "--quality", default="", dest="quality")
quality_parser = subparser.add_parser("quality")
quality_parser.add_argument("url", type=str)
subparser.add_parser("exit") subparser.add_parser("exit")
logger.remove() logger.remove()
@ -88,6 +94,8 @@ class NewInteractiveShell:
await self.do_m3u8(args.url, args.codec, args.force) await self.do_m3u8(args.url, args.codec, args.force)
case "download-from-file" | "dlf": case "download-from-file" | "dlf":
await self.do_download_from_file(args.file, args.codec, args.force) await self.do_download_from_file(args.file, args.codec, args.force)
case "quality":
await self.do_quality(args.url)
case "exit": case "exit":
self.loop.stop() self.loop.stop()
sys.exit() sys.exit()
@ -141,6 +149,55 @@ class NewInteractiveShell:
self.tasks.append(task) self.tasks.append(task)
task.add_done_callback(self.tasks.remove) task.add_done_callback(self.tasks.remove)
async def do_quality(self, raw_url: str):
url = AppleMusicURL.parse_url(raw_url)
if not url:
real_url = await get_real_url(raw_url)
url = AppleMusicURL.parse_url(real_url)
if not url:
logger.error("Illegal URL!")
return
logger.info(f"Getting data for {url.type} id {url.id}")
available_device = await self._get_available_device(url.storefront)
global_auth_param = GlobalAuthParams.from_auth_params_and_token(available_device.get_auth_params(),
self.anonymous_access_token)
match url.type:
case URLType.Song:
try:
song_metadata, audio_qualities = await get_available_song_audio_quality(url, self.config,
global_auth_param,
available_device)
except CodecNotFoundException:
return
table = PrettyTable(
field_names=["Codec ID", "Codec", "Bitrate", "Average Bitrate", "Channels", "Sample Rate",
"Bit Depth"])
audio_qualities.sort(key=lambda x: x.bitrate, reverse=True)
table.add_rows([list(audio_quality.model_dump().values()) for audio_quality in audio_qualities])
print_formatted_text(
f"Available audio qualities for song: {song_metadata.artist} - {song_metadata.title}:")
print_formatted_text(table)
case URLType.Album:
album_info = await get_album_info(url.id, global_auth_param.anonymousAccessToken, url.storefront,
self.config.region.language)
for track in album_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=url.storefront, url="", type=URLType.Song)
try:
song_metadata, audio_qualities = await get_available_song_audio_quality(song, self.config,
global_auth_param,
available_device)
except CodecNotFoundException:
return
table = PrettyTable(
field_names=["Codec ID", "Codec", "Bitrate", "Average Bitrate", "Channels", "Sample Rate",
"Bit Depth"])
table.add_rows([list(audio_quality.model_dump().values()) for audio_quality in audio_qualities])
print_formatted_text(
f"Available audio qualities for song: {song_metadata.artist} - {song_metadata.title}:")
print_formatted_text(table)
case _:
logger.error("Unsupported link!")
async def _get_available_device(self, storefront: str): async def _get_available_device(self, storefront: str):
devices = self.storefront_device_mapping.get(storefront) devices = self.storefront_device_mapping.get(storefront)
if not devices: if not devices:

@ -0,0 +1,69 @@
from typing import Optional
import m3u8
from loguru import logger
from pydantic import BaseModel
from src.api import get_song_info, get_m3u8_from_api, download_m3u8
from src.config import Config, Device
from src.exceptions import CodecNotFoundException
from src.metadata import SongMetadata
from src.types import GlobalAuthParams
from src.url import Song
from src.utils import get_codec_from_codec_id
async def get_available_audio_quality(m3u8_url: str):
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
result = []
for playlist in parsed_m3u8.playlists:
if get_codec_from_codec_id(playlist.stream_info.audio):
result.append(AudioQuality(codec_id=playlist.stream_info.audio,
codec=get_codec_from_codec_id(playlist.stream_info.audio),
bitrate=playlist.stream_info.bandwidth,
average_bitrate=playlist.stream_info.average_bandwidth,
channels=playlist.media[0].channels,
sample_rate=playlist.media[0].extras.get("sample_rate", None),
bit_depth=playlist.media[0].extras.get("bit_depth", None)))
return result
class AudioQuality(BaseModel):
codec_id: str
codec: str
bitrate: int
average_bitrate: int
channels: str
sample_rate: Optional[int] = None
bit_depth: Optional[int] = None
async def get_available_song_audio_quality(song: Song, config: Config, auth_params: GlobalAuthParams,
device: Device) -> tuple[SongMetadata, list[AudioQuality]]:
specified_m3u8 = None
token = auth_params.anonymousAccessToken
song_data = await get_song_info(song.id, token, song.storefront, config.region.language)
song_metadata = SongMetadata.parse_from_song_data(song_data)
if config.m3u8Api.enable:
m3u8_url = await get_m3u8_from_api(config.m3u8Api.endpoint, song.id, config.m3u8Api.enable)
if m3u8_url:
specified_m3u8 = m3u8_url
logger.info(f"Use m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}")
if not song_data.attributes.extendedAssetUrls:
logger.error(
f"Failed to get audio quality fo song: {song_metadata.artist} - {song_metadata.title}. Audio does not exist")
raise CodecNotFoundException
if not song_data.attributes.extendedAssetUrls.enhancedHls:
logger.error(
f"Failed to get audio quality for song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist")
raise CodecNotFoundException
if not specified_m3u8 and config.download.getM3u8FromDevice:
device_m3u8 = await device.get_m3u8(song.id)
if device_m3u8:
specified_m3u8 = device_m3u8
logger.info(f"Use m3u8 from device for song: {song_metadata.artist} - {song_metadata.title}")
if specified_m3u8:
audio_qualities = await get_available_audio_quality(specified_m3u8)
else:
audio_qualities = await get_available_audio_quality(song_data.attributes.extendedAssetUrls.enhancedHls)
return song_metadata, audio_qualities
Loading…
Cancel
Save