|
|
|
import asyncio
|
|
|
|
import time
|
|
|
|
from itertools import islice
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
import m3u8
|
|
|
|
import regex
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
|
|
from src.config import Download
|
|
|
|
from src.exceptions import NotTimeSyncedLyricsException
|
|
|
|
|
|
|
|
from src.types import *
|
|
|
|
|
|
|
|
|
|
|
|
def check_url(url):
|
|
|
|
pattern = regex.compile(
|
|
|
|
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/album|\/album\/.+))\/(?:id)?(\d[^\D]+)(?:$|\?)')
|
|
|
|
result = regex.findall(pattern, url)
|
|
|
|
return result[0][0], result[0][1]
|
|
|
|
|
|
|
|
|
|
|
|
def check_playlist_url(url):
|
|
|
|
pattern = regex.compile(
|
|
|
|
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/playlist|\/playlist\/.+))\/(?:id)?(pl\.[\w-]+)(?:$|\?)')
|
|
|
|
result = regex.findall(pattern, url)
|
|
|
|
return result[0][0], result[0][1]
|
|
|
|
|
|
|
|
|
|
|
|
def byte_length(i):
|
|
|
|
return (i.bit_length() + 7) // 8
|
|
|
|
|
|
|
|
|
|
|
|
def find_best_codec(parsed_m3u8: m3u8.M3U8, codec: str) -> Optional[m3u8.Playlist]:
|
|
|
|
available_medias = [playlist for playlist in parsed_m3u8.playlists
|
|
|
|
if regex.match(CodecRegex.get_pattern_by_codec(codec), playlist.stream_info.audio)]
|
|
|
|
if not available_medias:
|
|
|
|
return None
|
|
|
|
available_medias.sort(key=lambda x: x.stream_info.average_bandwidth, reverse=True)
|
|
|
|
return available_medias[0]
|
|
|
|
|
|
|
|
|
|
|
|
def chunk(it, size):
|
|
|
|
it = iter(it)
|
|
|
|
return iter(lambda: tuple(islice(it, size)), ())
|
|
|
|
|
|
|
|
|
|
|
|
def timeit(func):
|
|
|
|
async def process(func, *args, **params):
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
|
|
print('this function is a coroutine: {}'.format(func.__name__))
|
|
|
|
return await func(*args, **params)
|
|
|
|
else:
|
|
|
|
print('this is not a coroutine')
|
|
|
|
return func(*args, **params)
|
|
|
|
|
|
|
|
async def helper(*args, **params):
|
|
|
|
print('{}.time'.format(func.__name__))
|
|
|
|
start = time.time()
|
|
|
|
result = await process(func, *args, **params)
|
|
|
|
|
|
|
|
# Test normal function route...
|
|
|
|
# result = await process(lambda *a, **p: print(*a, **p), *args, **params)
|
|
|
|
|
|
|
|
print('>>>', time.time() - start)
|
|
|
|
return result
|
|
|
|
|
|
|
|
return helper
|
|
|
|
|
|
|
|
|
|
|
|
def get_digit_from_string(text: str) -> int:
|
|
|
|
return int(''.join(filter(str.isdigit, text)))
|
|
|
|
|
|
|
|
|
|
|
|
def ttml_convent_to_lrc(ttml: str) -> str:
|
|
|
|
b = BeautifulSoup(ttml, features="xml")
|
|
|
|
lrc_lines = []
|
|
|
|
for item in b.tt.body.children:
|
|
|
|
for lyric in item.children:
|
|
|
|
h, m, s, ms = 0, 0, 0, 0
|
|
|
|
lyric_time: str = lyric.get("begin")
|
|
|
|
if not lyric_time:
|
|
|
|
raise NotTimeSyncedLyricsException
|
|
|
|
match lyric_time.count(":"):
|
|
|
|
case 0:
|
|
|
|
split_time = lyric_time.split(".")
|
|
|
|
s, ms = get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1])
|
|
|
|
case 1:
|
|
|
|
split_time = lyric_time.split(":")
|
|
|
|
s_ms = split_time[-1]
|
|
|
|
del split_time[-1]
|
|
|
|
split_time.extend(s_ms.split("."))
|
|
|
|
m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
|
|
|
|
get_digit_from_string(split_time[2]))
|
|
|
|
case 2:
|
|
|
|
split_time = lyric_time.split(":")
|
|
|
|
s_ms = split_time[-1]
|
|
|
|
del split_time[-1]
|
|
|
|
split_time.extend(s_ms.split("."))
|
|
|
|
h, m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
|
|
|
|
get_digit_from_string(split_time[2]), get_digit_from_string(split_time[3]))
|
|
|
|
lrc_lines.append(
|
|
|
|
f"[{str(m + h * 60).rjust(2, '0')}:{str(s).rjust(2, '0')}.{str(int(ms / 10)).rjust(2, '0')}]{lyric.text}")
|
|
|
|
return "\n".join(lrc_lines)
|
|
|
|
|
|
|
|
|
|
|
|
def check_song_exists(metadata, config: Download, codec: str):
|
|
|
|
song_name = get_valid_filename(config.songNameFormat.format(**metadata.model_dump()))
|
|
|
|
dir_path = Path(config.dirPathFormat.format(**metadata.model_dump()))
|
|
|
|
if not config.atmosConventToM4a and codec == Codec.EC3:
|
|
|
|
return (Path(dir_path) / Path(song_name).with_suffix(".ec3")).exists()
|
|
|
|
elif not config.atmosConventToM4a and codec == Codec.AC3:
|
|
|
|
return (Path(dir_path) / Path(song_name).with_suffix(".ac3")).exists()
|
|
|
|
else:
|
|
|
|
return (Path(dir_path) / Path(song_name).with_suffix(".m4a")).exists()
|
|
|
|
|
|
|
|
|
|
|
|
def get_valid_filename(filename: str):
|
|
|
|
return "".join(i for i in filename if i not in r"\/:*?<>|")
|