feat: m3u8 download

pull/1/head
WorldObservationLog 5 months ago
parent 2d0ea60556
commit 62702df164
  1. 3
      config.toml
  2. 8
      src/cmd.py
  3. 3
      src/config.py
  4. 14
      src/rip.py
  5. 5
      src/utils.py

@ -1,4 +1,5 @@
[language] [region]
defaultStorefront = "jp"
language = "zh_CN" language = "zh_CN"
[[devices]] [[devices]]

@ -14,6 +14,8 @@ from src.config import Config
from src.rip import rip_song, rip_album from src.rip import rip_song, rip_album
from src.types import GlobalAuthParams from src.types import GlobalAuthParams
from src.url import AppleMusicURL, URLType from src.url import AppleMusicURL, URLType
from src.url import AppleMusicURL, URLType, Song
from src.utils import get_song_id_from_m3u8
class NewInteractiveShell: class NewInteractiveShell:
@ -38,6 +40,12 @@ class NewInteractiveShell:
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"], choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"],
default="alac") default="alac")
download_parser.add_argument("-f", "--force", type=bool, default=False) download_parser.add_argument("-f", "--force", type=bool, default=False)
m3u8_parser = subparser.add_parser("m3u8")
m3u8_parser.add_argument("url", type=str)
m3u8_parser.add_argument("-c", "--codec",
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"],
default="alac")
m3u8_parser.add_argument("-f", "--force", type=bool, default=False)
subparser.add_parser("exit") subparser.add_parser("exit")
logger.remove() logger.remove()

@ -3,8 +3,9 @@ import tomllib
from pydantic import BaseModel from pydantic import BaseModel
class Language(BaseModel): class Region(BaseModel):
language: str language: str
defaultStorefront: str
class Device(BaseModel): class Device(BaseModel):

@ -16,10 +16,10 @@ from src.utils import check_song_exists
@logger.catch @logger.catch
async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False): force_save: bool = False, specified_m3u8: str = ""):
logger.debug(f"Task of song id {song.id} was created") logger.debug(f"Task of song id {song.id} was created")
token = auth_params.anonymousAccessToken token = auth_params.anonymousAccessToken
song_data = await get_info_from_adam(song.id, token, song.storefront, config.language.language) song_data = await get_info_from_adam(song.id, token, song.storefront, config.region.language)
song_metadata = SongMetadata.parse_from_song_data(song_data) song_metadata = SongMetadata.parse_from_song_data(song_data)
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not force_save and check_song_exists(song_metadata, config.download, codec): if not force_save and check_song_exists(song_metadata, config.download, codec):
@ -28,8 +28,12 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
await song_metadata.get_cover(config.download.coverFormat) await song_metadata.get_cover(config.download.coverFormat)
if song_data.attributes.hasTimeSyncedLyrics: if song_data.attributes.hasTimeSyncedLyrics:
lyrics = await get_song_lyrics(song.id, song.storefront, auth_params.accountAccessToken, lyrics = await get_song_lyrics(song.id, song.storefront, auth_params.accountAccessToken,
auth_params.dsid, auth_params.accountToken, config.language.language) auth_params.dsid, auth_params.accountToken, config.region.language)
song_metadata.lyrics = lyrics song_metadata.lyrics = lyrics
if specified_m3u8:
song_uri, keys = await extract_media(specified_m3u8, codec, song_metadata,
config.download.codecPriority, config.download.codecAlternative)
else:
song_uri, keys = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata, song_uri, keys = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata,
config.download.codecPriority, config.download.codecAlternative) config.download.codecPriority, config.download.codecAlternative)
logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}") logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}")
@ -51,12 +55,12 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device, async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False): force_save: bool = False):
album_info = await get_meta(album.id, auth_params.anonymousAccessToken, album.storefront, config.language.language) album_info = await get_meta(album.id, auth_params.anonymousAccessToken, album.storefront, config.region.language)
logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}") logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
for track in album_info.data[0].relationships.tracks.data: for track in album_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song) song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song)
tg.create_task(rip_song(song, auth_params, codec, config, device, force_save)) tg.create_task(rip_song(song, auth_params, codec, config, device, force_save=force_save))
logger.info( logger.info(
f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping") f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping")

@ -124,3 +124,8 @@ def get_codec_from_codec_id(codec_id: str) -> str:
if regex.match(CodecRegex.get_pattern_by_codec(codec), codec_id): if regex.match(CodecRegex.get_pattern_by_codec(codec), codec_id):
return codec return codec
return "" return ""
def get_song_id_from_m3u8(m3u8_url: str) -> str:
parsed_m3u8 = m3u8.load(m3u8_url)
return regex.search(r"_A(\d*)_", parsed_m3u8.playlists[0].uri)[1]

Loading…
Cancel
Save