|
|
@ -6,11 +6,12 @@ from typing import Optional |
|
|
|
import frida |
|
|
|
import frida |
|
|
|
import regex |
|
|
|
import regex |
|
|
|
from loguru import logger |
|
|
|
from loguru import logger |
|
|
|
|
|
|
|
from tenacity import retry, retry_if_exception_type, wait_random_exponential, stop_after_attempt |
|
|
|
from ppadb.client import Client as AdbClient |
|
|
|
from ppadb.client import Client as AdbClient |
|
|
|
from ppadb.device import Device as AdbDevice |
|
|
|
from ppadb.device import Device as AdbDevice |
|
|
|
|
|
|
|
|
|
|
|
from src.exceptions import ADBConnectException, FailedGetAuthParamException, \ |
|
|
|
from src.exceptions import ADBConnectException, FailedGetAuthParamException, \ |
|
|
|
FridaNotRunningException |
|
|
|
FridaNotRunningException, FailedGetM3U8FromDeviceException |
|
|
|
from src.types import AuthParams |
|
|
|
from src.types import AuthParams |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -43,6 +44,7 @@ class Device: |
|
|
|
decryptLock: asyncio.Lock |
|
|
|
decryptLock: asyncio.Lock |
|
|
|
hyperDecryptDevices: list[HyperDecryptDevice] = [] |
|
|
|
hyperDecryptDevices: list[HyperDecryptDevice] = [] |
|
|
|
m3u8Script: frida.core.Script |
|
|
|
m3u8Script: frida.core.Script |
|
|
|
|
|
|
|
_m3u8ScriptLock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, host="127.0.0.1", port=5037, su_method: str = "su -c"): |
|
|
|
def __init__(self, host="127.0.0.1", port=5037, su_method: str = "su -c"): |
|
|
|
self.client = AdbClient(host, port) |
|
|
|
self.client = AdbClient(host, port) |
|
|
@ -50,17 +52,20 @@ class Device: |
|
|
|
self.host = host |
|
|
|
self.host = host |
|
|
|
self.decryptLock = asyncio.Lock() |
|
|
|
self.decryptLock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@retry(retry=retry_if_exception_type(FailedGetM3U8FromDeviceException), wait=wait_random_exponential(min=4, max=20), |
|
|
|
|
|
|
|
stop=stop_after_attempt(8)) |
|
|
|
async def get_m3u8(self, adam_id: str): |
|
|
|
async def get_m3u8(self, adam_id: str): |
|
|
|
try: |
|
|
|
async with self._m3u8ScriptLock: |
|
|
|
result: str = await self.m3u8Script.exports_async.getm3u8(adam_id) |
|
|
|
try: |
|
|
|
except frida.core.RPCException: |
|
|
|
result = await self.m3u8Script.exports_async.getm3u8(adam_id) |
|
|
|
# The script takes 8 seconds to start. |
|
|
|
except frida.core.RPCException or isinstance(result, int): |
|
|
|
# If the script does not start when the function is called, wait 8 seconds and call again. |
|
|
|
# The script takes 8 seconds to start. |
|
|
|
await asyncio.sleep(8) |
|
|
|
# If the script does not start when the function is called, wait 8 seconds and call again. |
|
|
|
result: str = await self.m3u8Script.exports_async.getm3u8(adam_id) |
|
|
|
await asyncio.sleep(8) |
|
|
|
if result.isdigit(): |
|
|
|
result = await self.m3u8Script.exports_async.getm3u8(adam_id) |
|
|
|
return None |
|
|
|
if isinstance(result, int): |
|
|
|
return result |
|
|
|
raise FailedGetM3U8FromDeviceException |
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def connect(self, host: str, port: int): |
|
|
|
def connect(self, host: str, port: int): |
|
|
|
try: |
|
|
|
try: |
|
|
|