File size: 7,847 Bytes
44bafb2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
"""
This module contains all the logic needed to find the signature functions.
YouTube's strategy to restrict downloading videos is to send a ciphered version
of the signature to the client, along with the decryption algorithm obfuscated
in JavaScript. For the clients to play the videos, JavaScript must take the
ciphered version, cycle it through a series of "transform functions," and then
signs the media URL with the output.
This module is responsible for (1) finding these "transformations
functions" (2) sends them to be interpreted by jsinterp.py
"""
import logging
import re
from pytubefix.exceptions import RegexMatchError, InterpretationError
from pytubefix.jsinterp import JSInterpreter, extract_player_js_global_var
logger = logging.getLogger(__name__)
class Cipher:
def __init__(self, js: str, js_url: str):
self.js_url = js_url
self.signature_function_name = get_initial_function_name(js, js_url)
self.throttling_function_name = get_throttling_function_name(js, js_url)
self.calculated_n = None
self.js_interpreter = JSInterpreter(js)
def get_throttling(self, n: str):
"""Interpret the function that throttles download speed.
:param str n:
Contains the parameter that must be transformed.
:rtype: str
:returns:
Returns the transformed value "n".
"""
try:
return self.js_interpreter.call_function(self.throttling_function_name, n)
except:
raise InterpretationError(js_url=self.js_url)
def get_signature(self, ciphered_signature: str) -> str:
"""interprets the function that signs the streams.
The lack of this signature generates the 403 forbidden error.
:param str ciphered_signature:
Contains the signature that must be transformed.
:rtype: str
:returns:
Returns the correct stream signature.
"""
try:
return self.js_interpreter.call_function(self.signature_function_name, ciphered_signature)
except:
raise InterpretationError(js_url=self.js_url)
def get_initial_function_name(js: str, js_url: str) -> str:
"""Extract the name of the function responsible for computing the signature.
:param str js:
The contents of the base.js asset file.
:param str js_url:
Full base.js url
:rtype: str
:returns:
Function name from regex match
"""
function_patterns = [
r'(?P<sig>[a-zA-Z0-9_$]+)\s*=\s*function\(\s*(?P<arg>[a-zA-Z0-9_$]+)\s*\)\s*{\s*(?P=arg)\s*=\s*(?P=arg)\.split\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)\s*;\s*[^}]+;\s*return\s+(?P=arg)\.join\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)',
r'(?:\b|[^a-zA-Z0-9_$])(?P<sig>[a-zA-Z0-9_$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9_$]{2}\.[a-zA-Z0-9_$]{2}\(a,\d+\))?',
r'\b(?P<var>[a-zA-Z0-9_$]+)&&\((?P=var)=(?P<sig>[a-zA-Z0-9_$]{2,})\(decodeURIComponent\((?P=var)\)\)',
# Old patterns
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\bm=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)',
# Obsolete patterns
r'("|\')signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(',
r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\('
]
logger.debug("looking for signature cipher name")
for pattern in function_patterns:
regex = re.compile(pattern)
function_match = regex.search(js)
if function_match:
sig = function_match.group('sig')
logger.debug("finished regex search, matched: %s", pattern)
logger.debug(f'Signature cipher function name: {sig}')
return sig
raise RegexMatchError(
caller="get_initial_function_name", pattern=f"multiple in {js_url}"
)
def get_throttling_function_name(js: str, js_url: str) -> str:
"""Extract the name of the function that computes the throttling parameter.
:param str js:
The contents of the base.js asset file.
:param str js_url:
Full base.js url
:rtype: str
:returns:
The name of the function used to compute the throttling parameter.
"""
logger.debug("looking for nsig name")
try:
# Extracts the function name based on the global array
global_obj, varname, code = extract_player_js_global_var(js)
if global_obj and varname and code:
logger.debug(f"Global Obj name is: {varname}")
global_obj = JSInterpreter(js).interpret_expression(code, {}, 100)
logger.debug("Successfully interpreted global object")
for k, v in enumerate(global_obj):
if v.endswith('_w8_'):
logger.debug(f"_w8_ found in index {k}")
pattern = r'''(?xs)
[;\n](?:
(?P<f>function\s+)|
(?:var\s+)?
)(?P<funcname>[a-zA-Z0-9_$]+)\s*(?(f)|=\s*function\s*)
\((?P<argname>[a-zA-Z0-9_$]+)\)\s*\{
(?:(?!\};(?![\]\)])).)+
\}\s*catch\(\s*[a-zA-Z0-9_$]+\s*\)\s*
\{\s*return\s+%s\[%d\]\s*\+\s*(?P=argname)\s*\}\s*return\s+[^}]+\}[;\n]
''' % (re.escape(varname), k)
func_name = re.search(pattern, js)
if func_name:
n_func = func_name.group("funcname")
logger.debug(f"Nfunc name is: {n_func}")
return n_func
except:
pass
pattern = r'''(?x)
(?:
\.get\("n"\)\)&&\(b=|
(?:
b=String\.fromCharCode\(110\)|
(?P<str_idx>[a-zA-Z0-9_$.]+)&&\(b="nn"\[\+(?P=str_idx)\]
)
(?:
,[a-zA-Z0-9_$]+\(a\))?,c=a\.
(?:
get\(b\)|
[a-zA-Z0-9_$]+\[b\]\|\|null
)\)&&\(c=|
\b(?P<var>[a-zA-Z0-9_$]+)=
)(?P<nfunc>[a-zA-Z0-9_$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z]\)
(?(var),[a-zA-Z0-9_$]+\.set\((?:"n+"|[a-zA-Z0-9_$]+)\,(?P=var)\))'''
logger.debug('Finding throttling function name')
regex = re.compile(pattern)
function_match = regex.search(js)
if function_match:
logger.debug("finished regex search, matched: %s", pattern)
func = function_match.group('nfunc')
idx = function_match.group('idx')
logger.debug(f'func is: {func}')
logger.debug(f'idx is: {idx}')
logger.debug('Checking throttling function name')
if idx:
n_func_check_pattern = fr'var {re.escape(func)}\s*=\s*\[(.+?)];'
n_func_found = re.search(n_func_check_pattern, js)
if n_func_found:
throttling_function = n_func_found.group(1)
logger.debug(f'Throttling function name is: {throttling_function}')
return throttling_function
raise RegexMatchError(
caller="get_throttling_function_name", pattern=f"{n_func_check_pattern} in {js_url}"
)
raise RegexMatchError(
caller="get_throttling_function_name", pattern=f"{pattern} in {js_url}"
)
|