You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ttt/tx.py

587 lines
71 KiB

import base64
import hashlib
import json
import random
import time
from pathlib import Path
from urllib.parse import parse_qsl, urlsplit
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from tabulate import tabulate
from wasmer_compiler_cranelift import Compiler
from wasmer import Store, Type, Function, Memory, Module, ImportObject, engine, Instance, Table
from pywidevine.L3.cdm import deviceconfig
from pywidevine.L3.decrypt.wvdecryptcustom import WvDecrypt
from tools import rsa_dec, aes_decrypt, djb2Hash, b64decode, sha1withrsa, updata_yaml, dealck, get_size
class Txckey:
def __init__(self):
appCodeName = 'Mozilla'
appName = 'Netscape'
appVersion = 'Win32'
platform = 'Win32',
self.appCodeName = appCodeName
self.appName = appName
self.appVersion = appVersion
self.platform = platform
self.userAgent = "mozilla/5.0 (windows nt 10.0; win64; x64) applewebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
self.key = "2A5AA60178AA6C8DA662E443773A6C4E"
self.iv = "CFAC216FAA2D396013575D4055C63350"
self.ptrs = []
self.pr = None
self.table = None
self.memory = None
self.func = None
wasmBinaryFile = "data:application/octet-stream;base64," \
"AGFzbQEAAAABwgEbYAF/AX9gAn9/AGADf39/AGACf38Bf2ABfwBgA39/fwF/YAAAYAR/fn9/AX9gA39/fgF/YAZ/f35" \
"/fn8Bf2AFf39/f38AYAZ/fH9/f38Bf2ADf39+AGAEf39/fwBgAn9+AGAGf39/f39/AX9gAAF+YAR/f39+AGAEf39/fwF" \
"/YAABfGAAAX9gCn9/f39/f39/f38Bf2ACfH8BfGAIf39/f39/f38AYAl/f39/f39/f38Bf2AFf39" \
"/f38Bf2ACfn8BfwIpBANoYzIDNkdqAAYDaGMyA3h1WAACA2hjMgMyZmIAEwNoYzIDRWpXAAADV1YOAQAEAAoCAAEDAgACDwEAAwYAAQECAAQEAAAACwQDCAUMABABAAUCABESAQEAAAECAQACAA0BAhQVFgAAFwwDAgoBABgHAAACBgQZAAkCDRoBAgABAgQFAXABCQkFBgEBgAKAAgYIAX8BQcCUBAsHTw0DY3NwAgADSk9qAE0Db3RtAD0DZ0RwAAsDNHlUAQADdENqAB0DMnpRABwDYlc4ADwDWU9kAE4DWmhuAB8DSmpGAE8DWVRpABEDNTVCAEgJDgEAQQELCBQjDUlRIDMkCrCWAlbaAQEBfyMAQRBrIgIgADYCDCACIAE3AwAgAigCDCACKQMAPAAAIAIgAikDAEIIiDcDACACKAIMIAIpAwA8AAEgAiACKQMAQgiINwMAIAIoAgwgAikDADwAAiACIAIpAwBCCIg3AwAgAigCDCACKQMAPAADIAIgAikDAEIIiDcDACACKAIMIAIpAwA8AAQgAiACKQMAQgiINwMAIAIoAgwgAikDADwABSACIAIpAwBCCIg3AwAgAigCDCACKQMAPAAGIAIgAikDAEIIiDcDACACKAIMIAIpAwA8AAcLNQEBfyMAQRBrIgIkACACIAA2AgwgAiABNgIIIAIoAgghACACKAIMEBYgADYCBCACQRBqJAALDQAgABATGkHv////Bws8AQF/IwBBEGsiASQAIAEgADYCDCABKAIMIgAgAUELaiABQQpqECsjAEEQayAANgIMIAAQISABQRBqJAALNwEBfyMAQRBrIgEkACABIAA2AgwCfyABKAIMIgAQP0EBcQRAIAAQJgwBCyAAEBoLIAFBEGokAAtvAQF/IwBBgAJrIgUkAAJAIAIgA0wNACAEQYDABHENACAFIAFB/wFxIAIgA2siA0GAAiADQYACSSIBGxAqGiABRQRAA0AgACAFQYACEFkgA0GAAmsiA0H/AUsNAAsLIAAgBSADEFkLIAVBgAJqJAALhAEBAX8jAEEQayIDJAAgAyAANgIMIAMgATYCCCADIAI2AgQgAygCBCgCAEEEakGAEEkEQCADKAIMIAMoAgQoAgBqIAMoAggiAEEYdCAAQYD+A3FBCHRyIABBCHZBgP4DcSAAQRh2cnI2AgAgAygCBCIAIAAoAgBBBGo2AgALIANBEGokAAulEwESfyMAQSBrIgskACALIAA2AhwgC0EEaiIOIAsoAhwQMCMAQdAAayIGJAAgBiALQRBqIg82AkwgBiAONgJIIAZBPGoiEUGoCRAwIAZBMGoiECAOECwgDhAeQZsKEFMjAEEgayIAJAAgACAGQRhqIgo2AhggAEGACDYCFCAAQQg2AhAgACAAKAIYIgE2AhwgASAAQQ9qIABBDmoQKyAAKAIQBEAgACgCFBoLIAEgACgCFCAAKAIQEEwjAEEQayABNgIMIAAoAhwaIABBIGokACMAQaACayIBJAAgASAGQSRqIg02ApwCIAEgEDYCmAIgASAKNgKUAiABKAKYAiECIwBBEGsiACQAIAAgAjYCDCAAKAIMEClFIQIgAEEQaiQAAkAgAgRAIA1BlgoQMAwBCyABQRBqIgdBAEGAAhAqGiABKAKUAhAxIQIgASgClAIQHiEEIwBBsAJrIgAkACAAIAc2AqwCIAAgAjYCqAIgACAENgKkAiAAQQA2AqACIABBADYCnAIgAEEQakEAQYACECoaIABBADoADyAAQQA2AqACA0AgACgCoAJBgAJIBEAgACgCoAIiAiAAKAKsAmogAjoAACAAKAKgAiAAQRBqaiAAKAKoAiAAKAKgAiAAKAKkAnBqLQAAOgAAIAAgACgCoAJBAWo2AqACDAELCyAAQQA2AqACA0AgACgCoAJBgAJIBEAgACAAKAKcAiAAKAKgAiICIAAoAqwCai0AAGogAEEQaiACai0AAGpB/wFxNgKcAiAAIAAoAqwCIAAoAqACai0AADoADyAAKAKsAiAAKAKgAmogACgCrAIgACgCnAJqLQAAOgAAIAAoAqwCIAAoApwCaiAALQAPOgAAIAAgACgCoAJBAWo2AqACDAELCyAAQbACaiQAIAFBADoADyANEAcgASgCmAIQHiEAIwBBEGsiAiQAIAIgDTYCDCACIAA2AgggAigCCCEEAkAgBCACKAIMIgAQKSIDSwRAIwBBEGsiCSQAIAQgA2siBARAIAAQVyEDIAAQKSIFIARqIQggBCADIAVrSwRAIAAgAyAIIANrIAUgBRBFCyAAEAghDCMAQRBrIgMgDDYCDCAFIAMoAgxqIQUgAyQAIANBADoADyADQQ9qIRIDQCAEBEAgBSASLQAAOgAAIARBAWshBCAFQQFqIQUMAQsLIANBEGokACAAIAgQNSAJQQA6AA8jAEEQayIAIAggDGo2AgwgACAJQQ9qNgIIIAAoAgwgACgCCC0AADoAAAsgCUEQaiQADAELIAAgABAIIQAjAEEQayIDIAA2AgwgAygCDCAEEA4LIAJBEGokACABKAKYAhAxIQIgASgCmAIQHiEEIA0QMSEDIwBBMGsiACAHNgIsIAAgAjYCKCAAIAQ2AiQgACADNgIgIABBADYCHCAAQQA2AhggAEEANgIUIABBADYCECAAQQA2AhADQCAAKAIQIAAoAiRJBEAgACAAKAIcQQFqQf8BcTYCHCAAIAAoAhggACgCLCAAKAIcai0AAGpB/wFxNgIYIAAgACgCLCAAKAIcai0AADoADyAAKAIsIAAoAhxqIAAoAiwgACgCGGotAAA6AAAgACgCLCAAKAIYaiAALQAPOgAAIAAgACgCLCAAKAIcai0AACAAKAIsIAAoAhhqLQAAakH/AXE2AhQgACgCKCAAKAIQaiICIAAoAiwgACgCFGotAAAgAi0AAHM6AAAgACgCICAAKAIQaiAAKAIoIAAoAhBqLQAAOgAAIAAgACgCEEEBajYCEAwBCwsgAUEBOgAPIAEtAA9BAXFFBEAgDRAbCwsgAUGgAmokACAKEBsgBiAQEEYjAEEgayIBJAAgASAGQQxqIgk2AhwgASAGNgIYIAFBADoAFyAJEAcgAUEANgIQA0AgASgCECAGEB5JBEAgASAGIAEoAhAQQy0AADYCACMAQRBrIgMkACADIAE2AgxBACEEIwBBoAFrIgAkACAAIAFBBmoiCCICNgKUASAAQQk2ApgBIABBAEGQARAqIgBBfzYCTCAAQQg2AiQgAEF/NgJQIAAgAEGfAWo2AiwgACAAQZQBajYCVCACQQA6AAAjAEHQAWsiAiQAIAIgATYCzAEgAkGgAWoiBUEAQSgQKhogAiACKALMATYCyAECQEEAIAJByAFqIAJB0ABqIAUQLkEASA0AIAAoAkxBAE4gACgCACEFIAAoAkhBAEwEQCAAIAVBX3E2AgALAn8CQAJAIAAoAjBFBEAgAEHQADYCMCAAQQA2AhwgAEIANwMQIAAoAiwhBCAAIAI2AiwMAQsgACgCEA0BC0F/IAAQQA0BGgsgACACQcgBaiACQdAAaiACQaABahAuCyEHIAQEfyAAQQBBACAAKAIkEQUAGiAAQQA2AjAgACAENgIsIABBADYCHCAAKAIUGiAAQgA3AxBBAAUgBwsaIAAgACgCACAFQSBxcjYCAEUNAAsgAkHQAWokACAAQaABaiQAIANBEGokACAJIAgQWCABIAEoAhBBAWo2AhAMAQsLIAFBAToAFyABLQAXQQFxRQRAIAkQGwsgAUEgaiQAIwBBEGsiBCQAIAQgDzYCDCAEIBE2AgggBCAJNgIEIAQoAgQhASAEKAIIIQIjAEEQayIAJAAgACABNgIMIABBADYCCCAAIAI2AgQCfyAAKAIMIQMgACgCCCEFIAAoAgQQMSEIIAAoAgQQKSEBIwBBEGsiCiQAIAUgAxApIgJNBEACQCABIAMQVyIHIAJrTQRAIAFFDQEgAxAIIQcjAEEQayIMIAc2AgwgDCgCDCIHIAVqIAIgBUcEfyAFIAdqIgwgAWogDCACIAVrEDsg
if wasmBinaryFile.startswith("data:application/octet-stream;base64,"):
wasm_file = base64.b64decode(wasmBinaryFile.split(",")[1])
else:
wasm_file = Path(wasmBinaryFile).read_bytes()
self.wasm_file = wasm_file
self.store = Store(engine.JIT(Compiler))
self.module = Module(self.store, self.wasm_file)
self.import_object = ImportObject()
self.register()
self.asm = Instance(self.module, self.import_object)
self.exports = self.asm.exports
self.gen_export_object()
self.gen_func()
def gen_func(self):
func = {
"free": self.func["k"],
"malloc": self.func["j"],
}
self.func.update(func)
def gen_func(self):
func = {
"free": self.func["2zQ"],
"malloc": self.func["tCj"],
}
self.func.update(func)
def malloc(self, length):
ptr = self.func["malloc"](length)
self.ptrs.append(ptr)
return ptr
def free(self, ptr):
if isinstance(ptr, list):
for i in ptr:
self.free(i)
else:
self.func['free'](ptr)
def free_all(self):
self.free(self.ptrs)
self.ptrs = []
def ccall(self, func_name: str, returnType: 'type', *args):
def convertReturnValue(_ptr: int):
if returnType == str:
return self.UTF8ToString(_ptr)
elif returnType == bool:
return bool(returnType)
return _ptr
stack = 0
_args = []
for arg in args:
if isinstance(arg, str):
max_write_length = len(arg) + 64
ptr = self.malloc(max_write_length)
if len(arg) >= 30:
self.pr = ptr
self.stringToUTF8(arg, ptr, max_write_length)
_args.append(ptr)
elif isinstance(arg, list):
max_write_length = len(arg) + 8
ptr = self.malloc(max_write_length)
self.writeArrayToMemory(arg, ptr, max_write_length)
_args.append(ptr)
else:
_args.append(arg)
ptr = self.func[func_name](*_args)
ret = convertReturnValue(ptr)
return ret
@staticmethod
def _abort():
pass
def _emscripten_memcpy_big(self, dest: int, src: int, num: int = None):
if num is None:
num = len(self.memory.uint8_view()) - 1
self.memory.uint8_view()[dest:dest + num] = self.memory.uint8_view()[src:src + num]
def _emscripten_resize_heap(self, param_0: int) -> int:
return 0
def gettm(self) -> 'f64':
return float(time.time())
def EjW(self, param_0: int) -> int:
return 0
def gen_import_object(self):
a = {
"6Gj": Function(self.store, self._abort),
"xuX": Function(self.store, self._emscripten_memcpy_big),
"2fb": Function(self.store, self.gettm),
"EjW": Function(self.store, self.EjW),
}
tximport = {
"hc2": a,
}
return tximport
def gen_export_object(self):
func = dict()
for (k, v) in self.exports:
if isinstance(v, Function):
func[k] = v
elif isinstance(v, Memory):
setattr(self, "memory", v)
elif isinstance(v, Table):
setattr(self, "table", v)
# print(k, v.type)
self.func = func
def register(self):
for i in self.gen_import_object():
self.import_object.register(i, self.gen_import_object()[i])
def stringToUTF8(self, data: str, ptr: int, max_write_length: int):
_data = data.encode('utf-8')
write_length = len(_data)
if write_length == 0:
self.memory.uint8_view()[ptr] = 0
_data = _data + b'\0' * (max_write_length - write_length)
uint8 = self.memory.uint8_view(offset=ptr)
uint8[0:max_write_length] = _data
def UTF8ToString(self, ptr: int) -> str:
if ptr > 0:
_memory = self.memory.uint8_view(offset=ptr)
data = []
index = 0
while _memory[index] != 0:
data.append(_memory[index])
index += 1
return bytes(data).decode('utf-8')
else:
return ''
def writeArrayToMemory(self, array: list, ptr: int, max_write_length: int):
array = array + [0] * (max_write_length - len(array))
self.memory.uint8_view()[ptr, ptr + max_write_length] = array
def __del__(self):
self.free_all()
del self.ptrs
def gen_key(self, *args):
return self.ccall("otm", str, *args)
def ckey(self, ):
pass
def aesenc(self, data: str):
key = bytes.fromhex(self.key)
iv = bytes.fromhex(self.iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
data = pad(data.encode('utf-8'), 16)
return cipher.encrypt(data).hex().upper()
def aesdec(self, data: str, key=None, iv=None):
key = bytes.fromhex(self.key)
iv = bytes.fromhex(self.iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
data = bytes.fromhex(data)
return unpad(cipher.decrypt(data), 16).decode('utf-8')
@staticmethod
def get_hash(s):
h = 0
for c in s:
h = (h << 5) - h + ord(c)
return str(h & 0xffffffff)
def ckey81(self, vid, tm, appVer='3.5.57', guid='52f0ea142b32b633', platform="10201",
url="https://v.qq.com/x/cover/mzc00200b4jsdq6/l00469csvi7.html"):
url = url[0:48]
navigator = self.userAgent[0:48]
appCodeName = self.appCodeName
appName = self.appName
platforma = self.platform
s = "|" + "|".join(
[vid, tm, "mg3c3b04ba", appVer, guid, platform, url, navigator, url, appCodeName, appName, platforma,
"00"]) + "|"
s = "|" + self.get_hash(s) + s
return s
@staticmethod
def roundstr(mun: int = 32):
return ''.join(random.sample('ZYXWVUTSRQPONMLKJIHGFEDCBAzyxwvutsrqponmlkjihgfedcba1234567890', mun))
@staticmethod
def md5str(a: str):
return hashlib.md5(a.encode()).hexdigest()
def ckey85(self, vid, appVer="1.26.3", guid="", platform="10201",
h38=""):
data = {
"vid": vid,
"nonce": "",
"rand": "",
"appVer": appVer,
"guid": guid,
"platform": platform,
}
ts = str(int(time.time()))
data["ts"] = ts
nonce = self.roundstr(11)
rand = self.md5str(nonce + "1234")[:8]
data["rand"] = rand
data["nonce"] = nonce
ckey = json.dumps(data, separators=(',', ':'))
enc = self.aesenc(ckey)
ckey = "--01" + enc
return ckey
def ckey92(self, tm, vid, appVer="1.28.2", guid="", h38="",
url="https://v.qq.com/x/cover/mzc002006w8m6hk/u0047t48n6f.html", platform=10201,
l="{\"cp\":\"59zexbw7hg\",\"csal\":[\"m5h0zchrh5\"]}", r="{\"ea\":3,\"spa\":1,\"hwdrm\":4,\"hwacc\":1}"):
url = url[0:48]
s = url + "|mozilla/5.0 (windows nt 10.0; win64; x64) applew|https://v.qq.com/channel/tv|Mozilla|Netscape|Win32"
data = [platform, appVer, vid, "", guid, s, l,
r, h38, int(tm)]
ckey = self.gen_key(*data).split('|')
for i in ckey:
if ckey.count(i) == 1:
ckey = i
break
if i == ckey[-1]:
ckey = ckey[5]
return ckey
class TX:
def __init__(self, ck):
self.ckey = Txckey()
self.guid = None
self.h38 = None
self.ck = ck
self.key = "-----BEGIN RSA PRIVATE KEY-----\n" + "MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAOG98XlIossYaXk4RHWOZutQEo1wvC4GHMNFBsYhGVbJgPVAF7SM6rPIkC/efoJ9qHYPcYGhh5LdB/FZkBkj8neeKT76+7CZkMYoolYS5gGLg4IgxTDS7uVoKMRLXQSKJxfEGLDlGKZ+oqQmW7hVUMTsYnsn+6WTk+P4FRTWO2uPAgMBAAECgYBKBbDS5mCLXFvppeu86I8TBlSvEJKEPPjdhxrriRr3/GdPBE9BoxurDE9LgxfUzkOZQwMjUMZWACiEmavIsqLkvM2Ld7WCQ6oiO739xZkQsgX/M0X7f1lcldLB2kHEsglWWexGoK68KD99HufHK+6QAnIL+AVhpE7cDXCmtK++AQJBAPGO6as8+3Vnm10ruCPt23FmrZlxpKA5LVzW9m0adPFWbPMJJFvI0oI9eJVhMYa9uFcRhdO0YSyrkUKvoAS7OIECQQDvPPWvy719Z5cIjE2yFh4DKS5JRZnPZZia2XGnOotbwEt6SFFFqASyR5xfh+1gjbaJ/6mQlli0YWvjWK8ylRwPAkEA4epkFffBwer1Pi0+WbQCcUuzfnfvnL389ABDloSQ7ImE+cQKEiF+57nwBd1RwY+8UQodXIMuAuYuw+yXPvWOgQJAJyPQBjzM+ZFTEmDx7SrVKis4mWA7s8SpXNwqTfO0DQS+1Hi0YzMD4a75lF+GpH9K1/Tt5uvSA2DU59MAhsQCXQJABelyNLFk6bf4n8CYAlOCZ7JCh3pUfbZ5mkuj6VmROzjXzRrT9B/tezNK7nEeUstVTiwl/DMXYCCwVXkLCvq9Bw==\n" + "-----END RSA PRIVATE KEY-----\n"
self.logintoken = {
"access_token": "",
"appid": "",
"vusession": "",
"openid": "",
"vuserid": "",
"video_guid": "",
"main_login": ""
}
self.headers = {
"User-Agent": self.ckey.userAgent,
"Referer": "https://v.qq.com",
"Cookie": self.ck,
}
self.re = requests.session()
self.re.headers.update(self.headers)
self.login()
def login(self):
cookie = dealck(self.ck)
for i in self.logintoken:
self.logintoken[i] = cookie.get(i) or cookie.get("vqq_" + i)
self.h38 = cookie.get("_qimei_h38", "")
self.guid = cookie.get("video_guid", "")
url = 'https://access.video.qq.com/user/auth_refresh'
params = {
"vappid": "11059694",
"vsecret": "fdf61a6be0aad57132bc5cdf78ac30145b6cd2c1470b0cfe",
"type": "qq",
"g_tk": "",
"g_vstk": djb2Hash(self.logintoken["vusession"]),
"g_actk": djb2Hash(self.logintoken["access_token"]),
"_": str(int(time.time() * 1000)),
}
data = self.re.get(url, params=params).text
data = json.loads(data.split("=")[1])
access_token = data["access_token"]
vusession = data["vusession"]
self.ck = self.ck.replace(self.logintoken["access_token"], access_token)
self.ck = self.ck.replace(self.logintoken["vusession"], vusession)
ck=self.ck.replace("vqq_", "")
self.logintoken["access_token"] = access_token
self.logintoken["vusession"] = vusession
self.headers = {
"User-Agent": self.ckey.userAgent,
"Referer": "https://v.qq.com",
"Cookie": ck,
}
self.re.headers.update(self.headers)
updata_yaml("txck", self.ck)
def wv(self, pssh, lic_url):
def ke():
wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64="",
device=deviceconfig.device_android_generic)
bcert_b64 = wvdecrypt.get_challenge()
lrs = sha1withrsa(self.key, bcert_b64)
data = [
10201,
"1.28.2",
"",
"",
self.guid,
"https://v.qq.com/x/cover/v2098lbuihuqs11/u00315w|mozilla/5.0 (windows nt 10.0; win64; x64) applew|https://v.qq.com/|Mozilla|Netscape|Win32",
'{"cp":"59zexbw7hg","csal":["m5h0zchrh5"],"lrs":"' + lrs + '"}',
'{"ea":3,"spa":1}',
self.h38,
int(time.time())
]
ckey = self.ckey.gen_key(*data).split('|')[5]
url = lic_url.split("?")[0]
dic["ckey"] = ckey
dic["encrypt_ver"] = "9.2"
dic['app_ver'] = "1.28.2"
self.re.headers["Referer"] = "https://v.qq.com/"
widevine_license = self.re.post(url=url, data=bcert_b64,
params=dic)
data = widevine_license.json()
code = data["code"]
if code != 0:
print(data)
return None
data = data["anc"]
data = self.dec_res(data, 2)
ke(data)
dic = dict(parse_qsl(urlsplit(lic_url).query))
dic["version"] = "1"
ke()
def dec_res(self, data: str, c=1):
data = json.loads(b64decode(data))
rc = b64decode(data["rc"])
aanc = b64decode(data["aanc"])
anc = b64decode(data["anc"])
key, iv = eval(rsa_dec(self.key, rc))["algo_params"]
algo_list = eval(aes_decrypt(key.encode(), aanc, iv.encode()).decode())["algo_list"]
key, iv = algo_list[0]["algo_params"]
if c == 1:
data = json.loads(aes_decrypt(key.encode(), anc, iv.encode()).decode())
else:
data = base64.b64encode(aes_decrypt(key.encode(), anc, iv.encode())).decode()
return data
def get(self, vid=None, url="https://v.qq.com/x/cover/mzc002007qle9m2/x0047r3k6wy.html", defn="hdr10"):
vid = url.split("/")[-1].split(".")[0] if vid is None else vid
tm = int(time.time())
ckey = self.ckey.ckey92(tm=tm, vid=vid, guid=self.guid, h38=self.h38, url=url)
params = {
"vid": vid,
"defn": defn,
"ehost": url,
"refer": url,
"platform": "10201",
"guid": self.guid,
"cKey": ckey,
"logintoken": json.dumps(self.logintoken, separators=(',', ':')),
"tm": tm,
"charge": "0",
"otype": "ojson",
"defnpayver": "3",
"spau": "1",
"spaudio": "0",
"spwm": "1",
"sphls": "2",
"host": "v.qq.com",
"sphttps": "1",
"encryptVer": "9.2",
"clip": "4",
"flowid": "",
"sdtfrom": "v1010",
"appVer": "1.28.2",
"unid": "",
"auth_from": "",
"auth_ext": "",
"fhdswitch": "0",
"dtype": "3",
"spsrt": "2",
"lang_code": "0",
"spvvpay": "1",
"spadseg": "3",
"spav1": "15",
"hevclv": "33",
"spsfrhdr": "0",
"spvideo": "0",
"spm3u8tag": "67",
"spmasterm3u8": "3",
"drm": "40"
}
response = self.re.get("https://h5vv6.video.qq.com/getinfo", params=params)
data = response.json()
if "anc" in data:
anc = data["anc"]
data = self.dec_res(anc)
return data
@staticmethod
def get_list(url):
def get_video_data(data, ret=[]):
cookies = {
"appid": "wxa75efa648b60994b",
"vversion_name": "8.2.95.1",
"video_bucketid": "4",
"video_omgid": ""
}
params = {
"video_appid": "3000002",
"guid": "",
"vplatform": "5",
"callerid": "3000002"
}
headers = {
"Accept-Encoding": "gzip,compress,br,deflate",
"Connection": "keep-alive",
"Host": "pbaccess.video.qq.com",
"Referer": "https://servicewechat.com/wxa75efa648b60994b/629/page-frame.html",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.42(0x18002a2e) NetType/WIFI Language/zh_CN",
"content-type": "application/json"
}
url = "https://pbaccess.video.qq.com/trpc.universal_backend_service.page_server_rpc.PageServer/GetPageData"
response = requests.post(url, headers=headers, cookies=cookies, params=params, data=data)
response_data = response.json()
module_list_datas = response_data['data']['module_list_datas']
for module_list_data in module_list_datas:
module_data = module_list_data['module_datas'][0]
if module_data['module_params']['module_type'] == "episode_list":
has_next = module_data['module_params']['has_next']
item_datas = module_data['item_data_lists']['item_datas']
for item_data in item_datas:
item_type = item_data['item_type']
if int(item_type) == 23:
print(item_data['item_params']['sub_title'])
continue
item_params = item_data['item_params']
play_type = item_params['play_type']
if int(play_type) != 1:
continue
cover_c_title = item_params['cover_c_title']
play_title = item_params['play_title']
vid = item_params['vid']
# defn = eval(item_params['defn'])
# defn = sorted(defn.items(), key=lambda x: x[1], reverse=True)
ret.append([cover_c_title, play_title, vid])
if has_next == "true":
next_page_context = module_data['module_params']['next_page_context']
data = {
"page_params": {
"page_type": "detail_operation",
"cid": "",
"page_id": "vsite_episode_list",
"page_context": next_page_context,
}
}
get_video_data(json.dumps(data, separators=(',', ':')), ret)
else:
break
return ret
cid = url.split("/")[5].split(".")[0]
data = {
"page_params": {
"page_type": "video_detail",
"cid": cid
}
}
data = get_video_data(json.dumps(data, separators=(',', ':')))
return data
def run(self, url=None):
url = input("输入url:") if url is None else url
list = self.get_list(url)
print(tabulate(list, headers=['cover_c_title', 'play_title', 'vid'], tablefmt="grid",
showindex=range(1, len(list) + 1)))
c = input("选择清晰度,多个用逗号隔开:")
c = c.split(",")
for i in c:
viddeodata = list[int(i) - 1]
c_title, title, vid = viddeodata
data = self.get(vid=vid, url=url)
fi = data['fl']['fi']
defn = []
for f in fi:
cname = f['cname']
name = f['name']
fs = f['fs']
size = get_size(fs)
fn = f['id']
defn.append([fn, name, cname, size, fs])
defn.sort(key=lambda x: x[-1], reverse=True)
print(tabulate(defn, headers=['id', 'fn', 'cname', 'size', "rsize"], tablefmt="grid",
showindex=range(1, len(defn) + 1)))
c = input("选择清晰度,多个用逗号隔开:")
c = c.split(",")
for i in c:
fn = defn[int(i) - 1][1]
rr = self.get(vid, url, fn) if fn != "hdr10" else data
vi = rr['vl']['vi'][0]
ui = vi['ul']['ui']
enc = vi.get("enc")
if enc == "1":
if "ckc" in vi:
lic_url = vi['ckc']
m3u8 = rr['play']['audiolist'][0]['m3u8']
pssh = m3u8.split("base64,")[1].split('",')[0]
print(pssh)
print("不支持wv")
#key = self.wv(pssh, lic_url)
elif "base" in vi:
tm = data['tm']
base = vi['base']
linkvid = vi['lnk']
appVer = "1.28.2"
platform = 10201
print("不支持chacha20")
'''
ckey = {"ChaCha20KeyBase64":"AAAAAA==","ChaCha20NonceBase64":"AAAAAA=="}
ChaCha20KeyBase64 = ckey["ChaCha20KeyBase64"]
ChaCha20NonceBase64 = ckey["ChaCha20NonceBase64"]
if ChaCha20KeyBase64.find("AAAAAA") != -1:
return None
'''
continue
m3u8url = ""
for a in ui:
m3u8url = a['url']
m3u8url = m3u8url if not m3u8url.endswith("/") else m3u8url + a['hls']['pt']
break
savepath = f"./download/tx/{c_title}/"
cmd = f"N_m3u8DL-RE.exe \"{m3u8url} \" --tmp-dir ./cache --save-name \"{title}\" --save-dir \"{savepath}\" --thread-count 16 --download-retry-count 30 --auto-select --check-segments-count "
with open("{}.bat".format(c_title), "a", encoding="utf-8") as f:
f.write("chcp 65001\n")
f.write(cmd)
f.write("\n")
print(f"已生成下载脚本{c_title}.bat 请运行下载")
if __name__ == '__main__':
ck = ""
tx = TX(ck)
tx.run()