""" This module contains all the logic needed to find the signature functions. YouTube's strategy to restrict downloading videos is to send a ciphered version of the signature to the client, along with the decryption algorithm obfuscated in JavaScript. For the clients to play the videos, JavaScript must take the ciphered version, cycle it through a series of "transform functions," and then signs the media URL with the output. This module is responsible for (1) finding these "transformations functions" (2) sends them to be interpreted by jsinterp.py """ import logging import re from pytubefix.exceptions import RegexMatchError, InterpretationError from pytubefix.jsinterp import JSInterpreter, extract_player_js_global_var logger = logging.getLogger(__name__) class Cipher: def __init__(self, js: str, js_url: str): self.js_url = js_url self.signature_function_name = get_initial_function_name(js, js_url) self.throttling_function_name = get_throttling_function_name(js, js_url) self.calculated_n = None self.js_interpreter = JSInterpreter(js) def get_throttling(self, n: str): """Interpret the function that throttles download speed. :param str n: Contains the parameter that must be transformed. :rtype: str :returns: Returns the transformed value "n". """ try: return self.js_interpreter.call_function(self.throttling_function_name, n) except: raise InterpretationError(js_url=self.js_url) def get_signature(self, ciphered_signature: str) -> str: """interprets the function that signs the streams. The lack of this signature generates the 403 forbidden error. :param str ciphered_signature: Contains the signature that must be transformed. :rtype: str :returns: Returns the correct stream signature. """ try: return self.js_interpreter.call_function(self.signature_function_name, ciphered_signature) except: raise InterpretationError(js_url=self.js_url) def get_initial_function_name(js: str, js_url: str) -> str: """Extract the name of the function responsible for computing the signature. :param str js: The contents of the base.js asset file. :param str js_url: Full base.js url :rtype: str :returns: Function name from regex match """ function_patterns = [ r'(?P[a-zA-Z0-9_$]+)\s*=\s*function\(\s*(?P[a-zA-Z0-9_$]+)\s*\)\s*{\s*(?P=arg)\s*=\s*(?P=arg)\.split\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)\s*;\s*[^}]+;\s*return\s+(?P=arg)\.join\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)', r'(?:\b|[^a-zA-Z0-9_$])(?P[a-zA-Z0-9_$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9_$]{2}\.[a-zA-Z0-9_$]{2}\(a,\d+\))?', r'\b(?P[a-zA-Z0-9_$]+)&&\((?P=var)=(?P[a-zA-Z0-9_$]{2,})\(decodeURIComponent\((?P=var)\)\)', # Old patterns r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(', r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(', r'\bm=(?P[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)', # Obsolete patterns r'("|\')signature\1\s*,\s*(?P[a-zA-Z0-9$]+)\(', r'\.sig\|\|(?P[a-zA-Z0-9$]+)\(', r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P[a-zA-Z0-9$]+)\(', r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P[a-zA-Z0-9$]+)\(', r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P[a-zA-Z0-9$]+)\(' ] logger.debug("looking for signature cipher name") for pattern in function_patterns: regex = re.compile(pattern) function_match = regex.search(js) if function_match: sig = function_match.group('sig') logger.debug("finished regex search, matched: %s", pattern) logger.debug(f'Signature cipher function name: {sig}') return sig raise RegexMatchError( caller="get_initial_function_name", pattern=f"multiple in {js_url}" ) def get_throttling_function_name(js: str, js_url: str) -> str: """Extract the name of the function that computes the throttling parameter. :param str js: The contents of the base.js asset file. :param str js_url: Full base.js url :rtype: str :returns: The name of the function used to compute the throttling parameter. """ logger.debug("looking for nsig name") try: # Extracts the function name based on the global array global_obj, varname, code = extract_player_js_global_var(js) if global_obj and varname and code: logger.debug(f"Global Obj name is: {varname}") global_obj = JSInterpreter(js).interpret_expression(code, {}, 100) logger.debug("Successfully interpreted global object") for k, v in enumerate(global_obj): if v.endswith('_w8_'): logger.debug(f"_w8_ found in index {k}") pattern = r'''(?xs) [;\n](?: (?Pfunction\s+)| (?:var\s+)? )(?P[a-zA-Z0-9_$]+)\s*(?(f)|=\s*function\s*) \((?P[a-zA-Z0-9_$]+)\)\s*\{ (?:(?!\};(?![\]\)])).)+ \}\s*catch\(\s*[a-zA-Z0-9_$]+\s*\)\s* \{\s*return\s+%s\[%d\]\s*\+\s*(?P=argname)\s*\}\s*return\s+[^}]+\}[;\n] ''' % (re.escape(varname), k) func_name = re.search(pattern, js) if func_name: n_func = func_name.group("funcname") logger.debug(f"Nfunc name is: {n_func}") return n_func except: pass pattern = r'''(?x) (?: \.get\("n"\)\)&&\(b=| (?: b=String\.fromCharCode\(110\)| (?P[a-zA-Z0-9_$.]+)&&\(b="nn"\[\+(?P=str_idx)\] ) (?: ,[a-zA-Z0-9_$]+\(a\))?,c=a\. (?: get\(b\)| [a-zA-Z0-9_$]+\[b\]\|\|null )\)&&\(c=| \b(?P[a-zA-Z0-9_$]+)= )(?P[a-zA-Z0-9_$]+)(?:\[(?P\d+)\])?\([a-zA-Z]\) (?(var),[a-zA-Z0-9_$]+\.set\((?:"n+"|[a-zA-Z0-9_$]+)\,(?P=var)\))''' logger.debug('Finding throttling function name') regex = re.compile(pattern) function_match = regex.search(js) if function_match: logger.debug("finished regex search, matched: %s", pattern) func = function_match.group('nfunc') idx = function_match.group('idx') logger.debug(f'func is: {func}') logger.debug(f'idx is: {idx}') logger.debug('Checking throttling function name') if idx: n_func_check_pattern = fr'var {re.escape(func)}\s*=\s*\[(.+?)];' n_func_found = re.search(n_func_check_pattern, js) if n_func_found: throttling_function = n_func_found.group(1) logger.debug(f'Throttling function name is: {throttling_function}') return throttling_function raise RegexMatchError( caller="get_throttling_function_name", pattern=f"{n_func_check_pattern} in {js_url}" ) raise RegexMatchError( caller="get_throttling_function_name", pattern=f"{pattern} in {js_url}" )