|
|
|
@ -1,4 +1,5 @@ |
|
|
|
|
import subprocess |
|
|
|
|
import sys |
|
|
|
|
import uuid |
|
|
|
|
from datetime import datetime |
|
|
|
|
from io import BytesIO |
|
|
|
@ -18,6 +19,13 @@ from src.types import * |
|
|
|
|
from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def if_shell(): |
|
|
|
|
if sys.platform in ('win32', 'cygwin', 'cli'): |
|
|
|
|
return False |
|
|
|
|
else: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]: |
|
|
|
|
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url) |
|
|
|
|
codec_ids = [playlist.stream_info.audio for playlist in parsed_m3u8.playlists] |
|
|
|
@ -70,10 +78,10 @@ async def extract_song(raw_song: bytes, codec: str) -> SongInfo: |
|
|
|
|
nhml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.nhml')).absolute() |
|
|
|
|
media_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.media')).absolute() |
|
|
|
|
subprocess.run(f"gpac -i {raw_mp4.absolute()} nhmlw:pckp=true -o {nhml_name}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
xml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.xml')).absolute() |
|
|
|
|
subprocess.run(f"mp4box -diso {raw_mp4.absolute()} -out {xml_name}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
decoder_params = None |
|
|
|
|
|
|
|
|
|
with open(xml_name, "r") as f: |
|
|
|
@ -89,7 +97,7 @@ async def extract_song(raw_song: bytes, codec: str) -> SongInfo: |
|
|
|
|
alac_atom_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.atom')).absolute() |
|
|
|
|
subprocess.run( |
|
|
|
|
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/alac {raw_mp4.absolute()} {alac_atom_name}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
with open(alac_atom_name, "rb") as f: |
|
|
|
|
decoder_params = f.read() |
|
|
|
|
case Codec.AAC | Codec.AAC_DOWNMIX | Codec.AAC_BINAURAL: |
|
|
|
@ -133,14 +141,14 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent |
|
|
|
|
nhml_xml.NHNTStream["baseMediaFile"] = media.name |
|
|
|
|
f.write(str(nhml_xml)) |
|
|
|
|
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
alac_params_atom_name = Path(tmp_dir.name) / Path(f"{name}.atom") |
|
|
|
|
with open(alac_params_atom_name.absolute(), "wb") as f: |
|
|
|
|
f.write(song_info.decoderParams) |
|
|
|
|
final_m4a_name = Path(tmp_dir.name) / Path(f"{name}_final.m4a") |
|
|
|
|
subprocess.run( |
|
|
|
|
f"mp4edit --insert moov/trak/mdia/minf/stbl/stsd/alac:{alac_params_atom_name.absolute()} {song_name.absolute()} {final_m4a_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
song_name = final_m4a_name |
|
|
|
|
case Codec.EC3 | Codec.AC3: |
|
|
|
|
if not atmos_convent: |
|
|
|
@ -148,7 +156,7 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent |
|
|
|
|
f.write(decrypted_media) |
|
|
|
|
else: |
|
|
|
|
subprocess.run(f"gpac -i {media.absolute()} -o {song_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
case Codec.AAC_BINAURAL | Codec.AAC_DOWNMIX | Codec.AAC: |
|
|
|
|
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml") |
|
|
|
|
info_name = Path(tmp_dir.name) / Path(f"{name}.info") |
|
|
|
@ -161,9 +169,9 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent |
|
|
|
|
nhml_xml.NHNTStream["streamType"] = "5" |
|
|
|
|
f.write(str(nhml_xml)) |
|
|
|
|
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
subprocess.run(f'mp4box -brand "M4A " -ab "M4A " -ab "mp42" {song_name.absolute()}', |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
with open(song_name.absolute(), "rb") as f: |
|
|
|
|
final_song = f.read() |
|
|
|
|
tmp_dir.cleanup() |
|
|
|
@ -189,7 +197,7 @@ async def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: li |
|
|
|
|
subprocess.run(["mp4box", "-time", time, "-mtime", time, "-name", f"1={metadata.title}", "-itags", |
|
|
|
|
":".join(["tool=", f"cover={absolute_cover_path}", |
|
|
|
|
metadata.to_itags_params(embed_metadata)]), |
|
|
|
|
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
with open(song_name.absolute(), "rb") as f: |
|
|
|
|
embed_song = f.read() |
|
|
|
|
tmp_dir.cleanup() |
|
|
|
@ -207,9 +215,9 @@ async def fix_encapsulate(song: bytes) -> bytes: |
|
|
|
|
with open(song_name.absolute(), "wb") as f: |
|
|
|
|
f.write(song) |
|
|
|
|
subprocess.run(["ffmpeg", "-y", "-i", song_name.absolute(), "-fflags", "+bitexact", "-c:a", "copy", "-c:v", "copy", |
|
|
|
|
new_song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
new_song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
with open(new_song_name.absolute(), "rb") as f: |
|
|
|
|
encapsulated_song = f.read() |
|
|
|
|
encapsulated_song = f.read() |
|
|
|
|
tmp_dir.cleanup() |
|
|
|
|
return encapsulated_song |
|
|
|
|
|
|
|
|
@ -229,9 +237,11 @@ async def fix_esds_box(raw_song: bytes, song: bytes) -> bytes: |
|
|
|
|
with open(song_name.absolute(), "wb") as f: |
|
|
|
|
f.write(song) |
|
|
|
|
subprocess.run( |
|
|
|
|
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/esds {raw_song_name.absolute()} {esds_name.absolute()}", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
subprocess.run(f"mp4edit --replace moov/trak/mdia/minf/stbl/stsd/mp4a/esds:{esds_name.absolute()} {song_name.absolute()} {final_song_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/esds {raw_song_name.absolute()} {esds_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
subprocess.run( |
|
|
|
|
f"mp4edit --replace moov/trak/mdia/minf/stbl/stsd/mp4a/esds:{esds_name.absolute()} {song_name.absolute()} {final_song_name.absolute()}", |
|
|
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) |
|
|
|
|
with open(final_song_name.absolute(), "rb") as f: |
|
|
|
|
final_song = f.read() |
|
|
|
|
tmp_dir.cleanup() |
|
|
|
|