Compare commits

..

2 Commits

Author SHA1 Message Date
WorldObservationLog 35a520fcf7 fix: shell execute problem on Linux and macOS 6 months ago
WorldObservationLog b82817c20e feat: config to disable get m3u8 from device 6 months ago
  1. 2
      config.example.toml
  2. 1
      src/config.py
  3. 38
      src/mp4.py
  4. 2
      src/rip.py

@ -35,6 +35,8 @@ endpoint = ""
proxy = ""
# Number of concurrent song downloads
parallelNum = 1
# Get m3u8 from device
getM3u8FromDevice = true
# After enabling this feature, if the specified codec does not exist, the script will look for other codec to download
codecAlternative = true
# Priority for script to look for alternative codec

@ -26,6 +26,7 @@ class M3U8Api(BaseModel):
class Download(BaseModel):
proxy: str
parallelNum: int
getM3u8FromDevice: bool
codecAlternative: bool
codecPriority: list[str]
atmosConventToM4a: bool

@ -1,4 +1,5 @@
import subprocess
import sys
import uuid
from datetime import datetime
from io import BytesIO
@ -18,6 +19,13 @@ from src.types import *
from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix
def if_shell():
if sys.platform in ('win32', 'cygwin', 'cli'):
return False
else:
return True
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
codec_ids = [playlist.stream_info.audio for playlist in parsed_m3u8.playlists]
@ -70,10 +78,10 @@ async def extract_song(raw_song: bytes, codec: str) -> SongInfo:
nhml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.nhml')).absolute()
media_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.media')).absolute()
subprocess.run(f"gpac -i {raw_mp4.absolute()} nhmlw:pckp=true -o {nhml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
xml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.xml')).absolute()
subprocess.run(f"mp4box -diso {raw_mp4.absolute()} -out {xml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
decoder_params = None
with open(xml_name, "r") as f:
@ -89,7 +97,7 @@ async def extract_song(raw_song: bytes, codec: str) -> SongInfo:
alac_atom_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.atom')).absolute()
subprocess.run(
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/alac {raw_mp4.absolute()} {alac_atom_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(alac_atom_name, "rb") as f:
decoder_params = f.read()
case Codec.AAC | Codec.AAC_DOWNMIX | Codec.AAC_BINAURAL:
@ -133,14 +141,14 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent
nhml_xml.NHNTStream["baseMediaFile"] = media.name
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
alac_params_atom_name = Path(tmp_dir.name) / Path(f"{name}.atom")
with open(alac_params_atom_name.absolute(), "wb") as f:
f.write(song_info.decoderParams)
final_m4a_name = Path(tmp_dir.name) / Path(f"{name}_final.m4a")
subprocess.run(
f"mp4edit --insert moov/trak/mdia/minf/stbl/stsd/alac:{alac_params_atom_name.absolute()} {song_name.absolute()} {final_m4a_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
song_name = final_m4a_name
case Codec.EC3 | Codec.AC3:
if not atmos_convent:
@ -148,7 +156,7 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent
f.write(decrypted_media)
else:
subprocess.run(f"gpac -i {media.absolute()} -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
case Codec.AAC_BINAURAL | Codec.AAC_DOWNMIX | Codec.AAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
info_name = Path(tmp_dir.name) / Path(f"{name}.info")
@ -161,9 +169,9 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent
nhml_xml.NHNTStream["streamType"] = "5"
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
subprocess.run(f'mp4box -brand "M4A " -ab "M4A " -ab "mp42" {song_name.absolute()}',
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(song_name.absolute(), "rb") as f:
final_song = f.read()
tmp_dir.cleanup()
@ -189,7 +197,7 @@ async def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: li
subprocess.run(["mp4box", "-time", time, "-mtime", time, "-name", f"1={metadata.title}", "-itags",
":".join(["tool=", f"cover={absolute_cover_path}",
metadata.to_itags_params(embed_metadata)]),
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(song_name.absolute(), "rb") as f:
embed_song = f.read()
tmp_dir.cleanup()
@ -207,9 +215,9 @@ async def fix_encapsulate(song: bytes) -> bytes:
with open(song_name.absolute(), "wb") as f:
f.write(song)
subprocess.run(["ffmpeg", "-y", "-i", song_name.absolute(), "-fflags", "+bitexact", "-c:a", "copy", "-c:v", "copy",
new_song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
new_song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(new_song_name.absolute(), "rb") as f:
encapsulated_song = f.read()
encapsulated_song = f.read()
tmp_dir.cleanup()
return encapsulated_song
@ -229,9 +237,11 @@ async def fix_esds_box(raw_song: bytes, song: bytes) -> bytes:
with open(song_name.absolute(), "wb") as f:
f.write(song)
subprocess.run(
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/esds {raw_song_name.absolute()} {esds_name.absolute()}", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run(f"mp4edit --replace moov/trak/mdia/minf/stbl/stsd/mp4a/esds:{esds_name.absolute()} {song_name.absolute()} {final_song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/esds {raw_song_name.absolute()} {esds_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
subprocess.run(
f"mp4edit --replace moov/trak/mdia/minf/stbl/stsd/mp4a/esds:{esds_name.absolute()} {song_name.absolute()} {final_song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(final_song_name.absolute(), "rb") as f:
final_song = f.read()
tmp_dir.cleanup()

@ -69,7 +69,7 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
if not specified_m3u8 and not song_data.attributes.extendedAssetUrls.enhancedHls:
logger.error(f"Failed to download song: {song_metadata.artist} - {song_metadata.title}. Lossless audio does not exist")
return
if not specified_m3u8:
if not specified_m3u8 and config.download.getM3u8FromDevice:
device_m3u8 = await device.get_m3u8(song.id)
if device_m3u8:
specified_m3u8 = device_m3u8

Loading…
Cancel
Save