Apple Music decryption tool, based on zhaarey/apple-music-alac-atmos-downloader
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AppleMusicDecrypt/src/mp4.py

166 lines
7.8 KiB

5 months ago
import subprocess
import uuid
from io import BytesIO
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Tuple
import m3u8
import regex
from bs4 import BeautifulSoup
from src.metadata import SongMetadata
from src.types import *
from src.utils import find_best_codec
async def extract_media(m3u8_url: str, codec: str) -> Tuple[str, list[str], str]:
parsed_m3u8 = m3u8.load(m3u8_url)
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
selected_codec = specifyPlaylist.media[0].group_id
if not specifyPlaylist:
raise
stream = m3u8.load(specifyPlaylist.absolute_uri)
skds = [key.uri for key in stream.keys if regex.match('(skd?://[^"]*)', key.uri)]
keys = [prefetchKey]
key_suffix = CodecKeySuffix.KeySuffixDefault
match codec:
case Codec.ALAC:
key_suffix = CodecKeySuffix.KeySuffixAlac
case Codec.EC3:
key_suffix = CodecKeySuffix.KeySuffixAtmos
case Codec.AAC:
key_suffix = CodecKeySuffix.KeySuffixAAC
case Codec.AAC_BINAURAL:
key_suffix = CodecKeySuffix.KeySuffixAACBinaural
case Codec.AAC_DOWNMIX:
key_suffix = CodecKeySuffix.KeySuffixAACDownmix
for key in skds:
if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault):
keys.append(key)
return stream.segment_map[0].absolute_uri, keys, selected_codec
def extract_song(raw_song: bytes, codec: str) -> SongInfo:
tmp_dir = TemporaryDirectory()
mp4_name = uuid.uuid4().hex
raw_mp4 = Path(tmp_dir.name) / Path(f"{mp4_name}.mp4")
with open(raw_mp4.absolute(), "wb") as f:
f.write(raw_song)
nhml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.nhml')).absolute()
media_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.media')).absolute()
subprocess.run(f"gpac -i {raw_mp4.absolute()} nhmlw:pckp=true -o {nhml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
xml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.xml')).absolute()
subprocess.run(f"mp4box -diso {raw_mp4.absolute()} -out {xml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
decoder_params = None
with open(xml_name, "r") as f:
info_xml = BeautifulSoup(f.read(), "xml")
with open(nhml_name, "r") as f:
raw_nhml = f.read()
nhml = BeautifulSoup(raw_nhml, "xml")
with open(media_name, "rb") as f:
media = BytesIO(f.read())
if codec == Codec.ALAC:
alac_atom_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.atom')).absolute()
subprocess.run(f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/alac {raw_mp4.absolute()} {alac_atom_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(alac_atom_name, "rb") as f:
decoder_params = f.read()
samples = []
moofs = info_xml.find_all("MovieFragmentBox")
nhnt_sample_number = 0
nhnt_samples = {}
for sample in nhml.find_all("NHNTSample"):
nhnt_samples.update({int(sample.get("number")): sample})
for i, moof in enumerate(moofs):
tfhd = moof.TrackFragmentBox.TrackFragmentHeaderBox
index = 0 if not tfhd.get("SampleDescriptionIndex") else int(tfhd.get("SampleDescriptionIndex")) - 1
truns = moof.TrackFragmentBox.find_all("TrackRunBox")
for trun in truns:
for sample_number in range(int(trun.get("SampleCount"))):
nhnt_sample_number += 1
nhnt_sample = nhnt_samples[nhnt_sample_number]
sample_data = media.read(int(nhnt_sample.get("dataLength")))
duration = int(nhnt_sample.get("duration"))
samples.append(SampleInfo(descIndex=index, data=sample_data, duration=int(duration)))
tmp_dir.cleanup()
return SongInfo(codec=codec, raw=raw_song, samples=samples, nhml=raw_nhml, decoderParams=decoder_params)
def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent: bool) -> bytes:
tmp_dir = TemporaryDirectory()
name = uuid.uuid4().hex
media = Path(tmp_dir.name) / Path(name).with_suffix(".media")
with open(media.absolute(), "wb") as f:
f.write(decrypted_media)
if song_info.codec == Codec.EC3 and not atmos_convent:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".ec3")
else:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".m4a")
match song_info.codec:
case Codec.ALAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
with open(nhml_name.absolute(), "w", encoding="utf-8") as f:
nhml_xml = BeautifulSoup(song_info.nhml, features="xml")
nhml_xml.NHNTStream["baseMediaFile"] = media.name
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
alac_params_atom_name = Path(tmp_dir.name) / Path(f"{name}.atom")
with open(alac_params_atom_name.absolute(), "wb") as f:
f.write(song_info.decoderParams)
final_m4a_name = Path(tmp_dir.name) / Path(f"{name}_final.m4a")
subprocess.run(
f"mp4edit --insert moov/trak/mdia/minf/stbl/stsd/alac:{alac_params_atom_name.absolute()} {song_name.absolute()} {final_m4a_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
song_name = final_m4a_name
case Codec.EC3:
if not atmos_convent:
with open(song_name.absolute(), "wb") as f:
f.write(decrypted_media)
subprocess.run(f"gpac -i {media.absolute()} -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
case Codec.AAC_BINAURAL | Codec.AAC_DOWNMIX | Codec.AAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
with open(nhml_name.absolute(), "w", encoding="utf-8") as f:
nhml_xml = BeautifulSoup(song_info.nhml, features="xml")
nhml_xml.NHNTStream["baseMediaFile"] = media.name
del nhml_xml.NHNTStream["streamType"]
del nhml_xml.NHNTStream["objectTypeIndication"]
del nhml_xml.NHNTStream["specificInfoFile"]
nhml_xml.NHNTStream["mediaType"] = "soun"
nhml_xml.NHNTStream["mediaSubType"] = "mp4a"
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
final_song = f.read()
tmp_dir.cleanup()
return final_song
def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str], cover_format: str) -> bytes:
tmp_dir = TemporaryDirectory()
name = uuid.uuid4().hex
song_name = Path(tmp_dir.name) / Path(f"{name}.m4a")
with open(song_name.absolute(), "wb") as f:
f.write(song)
absolute_cover_path = ""
if "cover" in embed_metadata:
cover_path = Path(tmp_dir.name) / Path(f"cover.{cover_format}")
absolute_cover_path = cover_path.absolute()
with open(cover_path.absolute(), "wb") as f:
f.write(metadata.cover)
subprocess.run(["mp4box", "-time", "0", "-mtime", "0", "-keep-utc", "-name", f"1={metadata.title}", "-itags",
":".join(["tool=\"\"", f"cover={absolute_cover_path}", metadata.to_itags_params(embed_metadata, cover_format)]),
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
embed_song = f.read()
tmp_dir.cleanup()
return embed_song