File size: 7,847 Bytes
44bafb2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
This module contains all the logic needed to find the signature functions.

YouTube's strategy to restrict downloading videos is to send a ciphered version
of the signature to the client, along with the decryption algorithm obfuscated
in JavaScript. For the clients to play the videos, JavaScript must take the
ciphered version, cycle it through a series of "transform functions," and then
signs the media URL with the output.

This module is responsible for (1) finding these "transformations
functions" (2) sends them to be interpreted by jsinterp.py
"""
import logging
import re

from pytubefix.exceptions import RegexMatchError, InterpretationError
from pytubefix.jsinterp import JSInterpreter, extract_player_js_global_var

logger = logging.getLogger(__name__)


class Cipher:
    def __init__(self, js: str, js_url: str):

        self.js_url = js_url

        self.signature_function_name = get_initial_function_name(js, js_url)
        self.throttling_function_name = get_throttling_function_name(js, js_url)

        self.calculated_n = None

        self.js_interpreter = JSInterpreter(js)

    def get_throttling(self, n: str):
        """Interpret the function that throttles download speed.
        :param str n:
            Contains the parameter that must be transformed.
        :rtype: str
        :returns:
            Returns the transformed value "n".
        """
        try:
            return self.js_interpreter.call_function(self.throttling_function_name, n)
        except:
            raise InterpretationError(js_url=self.js_url)

    def get_signature(self, ciphered_signature: str) -> str:
        """interprets the function that signs the streams.
            The lack of this signature generates the 403 forbidden error.
        :param str ciphered_signature:
           Contains the signature that must be transformed.
        :rtype: str
        :returns:
           Returns the correct stream signature.
        """
        try:
            return self.js_interpreter.call_function(self.signature_function_name, ciphered_signature)
        except:
            raise InterpretationError(js_url=self.js_url)


def get_initial_function_name(js: str, js_url: str) -> str:
    """Extract the name of the function responsible for computing the signature.
    :param str js:
        The contents of the base.js asset file.
    :param str js_url:
        Full base.js url
    :rtype: str
    :returns:
        Function name from regex match
    """

    function_patterns = [
        r'(?P<sig>[a-zA-Z0-9_$]+)\s*=\s*function\(\s*(?P<arg>[a-zA-Z0-9_$]+)\s*\)\s*{\s*(?P=arg)\s*=\s*(?P=arg)\.split\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)\s*;\s*[^}]+;\s*return\s+(?P=arg)\.join\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)',
        r'(?:\b|[^a-zA-Z0-9_$])(?P<sig>[a-zA-Z0-9_$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9_$]{2}\.[a-zA-Z0-9_$]{2}\(a,\d+\))?',
        r'\b(?P<var>[a-zA-Z0-9_$]+)&&\((?P=var)=(?P<sig>[a-zA-Z0-9_$]{2,})\(decodeURIComponent\((?P=var)\)\)',
        # Old patterns
        r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
        r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
        r'\bm=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)',
        # Obsolete patterns
        r'("|\')signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
        r'\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(',
        r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(',
        r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
        r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\('
    ]
    logger.debug("looking for signature cipher name")
    for pattern in function_patterns:
        regex = re.compile(pattern)
        function_match = regex.search(js)
        if function_match:
            sig = function_match.group('sig')
            logger.debug("finished regex search, matched: %s", pattern)
            logger.debug(f'Signature cipher function name: {sig}')
            return sig

    raise RegexMatchError(
        caller="get_initial_function_name", pattern=f"multiple in {js_url}"
    )


def get_throttling_function_name(js: str, js_url: str) -> str:
    """Extract the name of the function that computes the throttling parameter.

    :param str js:
        The contents of the base.js asset file.
    :param str js_url:
        Full base.js url
    :rtype: str
    :returns:
        The name of the function used to compute the throttling parameter.
    """

    logger.debug("looking for nsig name")
    try:
        # Extracts the function name based on the global array
        global_obj, varname, code = extract_player_js_global_var(js)
        if global_obj and varname and code:
            logger.debug(f"Global Obj name is: {varname}")
            global_obj = JSInterpreter(js).interpret_expression(code, {}, 100)
            logger.debug("Successfully interpreted global object")
            for k, v in enumerate(global_obj):
                if v.endswith('_w8_'):
                    logger.debug(f"_w8_ found in index {k}")
                    pattern = r'''(?xs)
                            [;\n](?:
                                (?P<f>function\s+)|
                                (?:var\s+)?
                            )(?P<funcname>[a-zA-Z0-9_$]+)\s*(?(f)|=\s*function\s*)
                            \((?P<argname>[a-zA-Z0-9_$]+)\)\s*\{
                            (?:(?!\};(?![\]\)])).)+
                            \}\s*catch\(\s*[a-zA-Z0-9_$]+\s*\)\s*
                            \{\s*return\s+%s\[%d\]\s*\+\s*(?P=argname)\s*\}\s*return\s+[^}]+\}[;\n]
                        '''  % (re.escape(varname), k)
                    func_name = re.search(pattern, js)
                    if func_name:
                        n_func = func_name.group("funcname")
                        logger.debug(f"Nfunc name is: {n_func}")
                        return n_func
    except:
        pass

    pattern = r'''(?x)
            (?:
                \.get\("n"\)\)&&\(b=|
                (?:
                    b=String\.fromCharCode\(110\)|
                    (?P<str_idx>[a-zA-Z0-9_$.]+)&&\(b="nn"\[\+(?P=str_idx)\]
                )
                (?:
                    ,[a-zA-Z0-9_$]+\(a\))?,c=a\.
                    (?:
                        get\(b\)|
                        [a-zA-Z0-9_$]+\[b\]\|\|null
                    )\)&&\(c=|
                \b(?P<var>[a-zA-Z0-9_$]+)=
            )(?P<nfunc>[a-zA-Z0-9_$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z]\)
            (?(var),[a-zA-Z0-9_$]+\.set\((?:"n+"|[a-zA-Z0-9_$]+)\,(?P=var)\))'''

    logger.debug('Finding throttling function name')

    regex = re.compile(pattern)
    function_match = regex.search(js)
    if function_match:
        logger.debug("finished regex search, matched: %s", pattern)

        func = function_match.group('nfunc')
        idx = function_match.group('idx')

        logger.debug(f'func is: {func}')
        logger.debug(f'idx is: {idx}')

        logger.debug('Checking throttling function name')
        if idx:
            n_func_check_pattern = fr'var {re.escape(func)}\s*=\s*\[(.+?)];'
            n_func_found = re.search(n_func_check_pattern, js)

            if n_func_found:
                throttling_function = n_func_found.group(1)
                logger.debug(f'Throttling function name is: {throttling_function}')
                return throttling_function

            raise RegexMatchError(
                caller="get_throttling_function_name", pattern=f"{n_func_check_pattern} in {js_url}"
            )

    raise RegexMatchError(
        caller="get_throttling_function_name", pattern=f"{pattern} in {js_url}"
    )