Compare commits

...

3 Commits

Author SHA1 Message Date
WorldObservationLog 21b8607406 feat: support clean rating 5 months ago
WorldObservationLog 4bedff5d4f Merge remote-tracking branch 'origin/master' 5 months ago
WorldObservationLog 70b07caa51 fix: many bugs about atmos 5 months ago
  1. 11
      src/metadata.py
  2. 8
      src/mp4.py
  3. 7
      src/rip.py
  4. 2
      src/save.py
  5. 10
      src/utils.py

@ -68,10 +68,19 @@ class SongMetadata(BaseModel):
record_company=song_data.relationships.albums.data[0].attributes.recordLabel, record_company=song_data.relationships.albums.data[0].attributes.recordLabel,
upc=song_data.relationships.albums.data[0].attributes.upc, upc=song_data.relationships.albums.data[0].attributes.upc,
isrc=song_data.attributes.isrc, isrc=song_data.attributes.isrc,
rtng=1 if song_data.attributes.contentRating and song_data.attributes.contentRating == 'explicit' else 0, rtng=cls._rating(song_data.attributes.contentRating),
song_id=song_data.id, album_id=song_data.relationships.albums.data[0].id song_id=song_data.id, album_id=song_data.relationships.albums.data[0].id
) )
@staticmethod
def _rating(content_rating: Optional[str]) -> int:
if not content_rating:
return 0
if content_rating == "explicit":
return 1
if content_rating == "clean":
return 2
def set_lyrics(self, lyrics: str): def set_lyrics(self, lyrics: str):
self.lyrics = lyrics self.lyrics = lyrics

@ -16,7 +16,8 @@ from src.api import download_m3u8
from src.exceptions import CodecNotFoundException from src.exceptions import CodecNotFoundException
from src.metadata import SongMetadata from src.metadata import SongMetadata
from src.types import * from src.types import *
from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix, convent_mac_timestamp_to_datetime from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix, convent_mac_timestamp_to_datetime, \
if_raw_atmos
def if_shell(): def if_shell():
@ -32,9 +33,9 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
codecs = [get_codec_from_codec_id(codec_id) for codec_id in codec_ids] codecs = [get_codec_from_codec_id(codec_id) for codec_id in codec_ids]
return codecs, codec_ids return codecs, codec_ids
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata, async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
codec_priority: list[str], alternative_codec: bool = False, alacMax: Optional[int] = None, atmosMax: Optional[int] = None) -> Tuple[str, list[str], str, Optional[int], Optional[int]]: codec_priority: list[str], alternative_codec: bool = False, alacMax: Optional[int] = None,
atmosMax: Optional[int] = None) -> Tuple[str, list[str], str, Optional[int], Optional[int]]:
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url) parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
specifyPlaylist = find_best_codec(parsed_m3u8, codec, alacMax, atmosMax) specifyPlaylist = find_best_codec(parsed_m3u8, codec, alacMax, atmosMax)
if not specifyPlaylist and alternative_codec: if not specifyPlaylist and alternative_codec:
@ -187,6 +188,7 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent
f.write(str(nhml_xml)) f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}", subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
if not if_raw_atmos(song_info.codec, atmos_convent):
subprocess.run(f'mp4box -brand "M4A " -ab "M4A " -ab "mp42" {song_name.absolute()}', subprocess.run(f'mp4box -brand "M4A " -ab "M4A " -ab "mp42" {song_name.absolute()}',
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell()) stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(song_name.absolute(), "rb") as f: with open(song_name.absolute(), "rb") as f:

@ -103,12 +103,13 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
decrypted_song = await decrypt(song_info, keys, song_data, device) decrypted_song = await decrypt(song_info, keys, song_data, device)
song = await encapsulate(song_info, decrypted_song, config.download.atmosConventToM4a) song = await encapsulate(song_info, decrypted_song, config.download.atmosConventToM4a)
if not if_raw_atmos(codec, config.download.atmosConventToM4a): if not if_raw_atmos(codec, config.download.atmosConventToM4a):
metadata_song = await write_metadata(song, song_metadata, config.metadata.embedMetadata, song = await write_metadata(song, song_metadata, config.metadata.embedMetadata,
config.download.coverFormat, song_info.params) config.download.coverFormat, song_info.params)
song = await fix_encapsulate(metadata_song) if codec != Codec.EC3 or codec != Codec.EC3:
song = await fix_encapsulate(song)
if codec == Codec.AAC or codec == Codec.AAC_DOWNMIX or codec == Codec.AAC_BINAURAL: if codec == Codec.AAC or codec == Codec.AAC_DOWNMIX or codec == Codec.AAC_BINAURAL:
song = await fix_esds_box(song_info.raw, song) song = await fix_esds_box(song_info.raw, song)
filename = await save(song, codec.upper(), song_metadata, config.download, playlist) filename = await save(song, codec, song_metadata, config.download, playlist)
logger.info(f"Song {song_metadata.artist} - {song_metadata.title} saved!") logger.info(f"Song {song_metadata.artist} - {song_metadata.title} saved!")
if config.download.afterDownloaded: if config.download.afterDownloaded:
command = config.download.afterDownloaded.format(filename=filename) command = config.download.afterDownloaded.format(filename=filename)

@ -8,7 +8,7 @@ from src.utils import ttml_convent_to_lrc, get_song_name_and_dir_path, get_suffi
async def save(song: bytes, codec: str, metadata: SongMetadata, config: Download, playlist: PlaylistInfo = None): async def save(song: bytes, codec: str, metadata: SongMetadata, config: Download, playlist: PlaylistInfo = None):
song_name, dir_path = get_song_name_and_dir_path(codec, config, metadata, playlist) song_name, dir_path = get_song_name_and_dir_path(codec.upper(), config, metadata, playlist)
if not dir_path.exists() or not dir_path.is_dir(): if not dir_path.exists() or not dir_path.is_dir():
os.makedirs(dir_path.absolute()) os.makedirs(dir_path.absolute())
song_path = dir_path / Path(song_name + get_suffix(codec, config.atmosConventToM4a)) song_path = dir_path / Path(song_name + get_suffix(codec, config.atmosConventToM4a))

@ -142,16 +142,16 @@ def get_song_id_from_m3u8(m3u8_url: str) -> str:
return regex.search(r"_A(\d*)_", parsed_m3u8.playlists[0].uri)[1] return regex.search(r"_A(\d*)_", parsed_m3u8.playlists[0].uri)[1]
def if_raw_atmos(codec: str, save_raw_atmos: bool): def if_raw_atmos(codec: str, convent_atmos: bool):
if (codec == Codec.EC3 or codec == Codec.AC3) and save_raw_atmos: if (codec == Codec.EC3 or codec == Codec.AC3) and not convent_atmos:
return True return True
return False return False
def get_suffix(codec: str, save_raw_atmos: bool): def get_suffix(codec: str, convent_atmos: bool):
if not save_raw_atmos and codec == Codec.EC3: if not convent_atmos and codec == Codec.EC3:
return ".ec3" return ".ec3"
elif not save_raw_atmos and codec == Codec.AC3: elif not convent_atmos and codec == Codec.AC3:
return ".ac3" return ".ac3"
else: else:
return ".m4a" return ".m4a"

Loading…
Cancel
Save