|
|
|
|
|
import base64 |
|
import logging |
|
from enum import Enum |
|
from typing import Optional |
|
from collections.abc import Callable |
|
from urllib.request import Request, urlopen |
|
|
|
from pytubefix.sabr.core.UMP import UMP |
|
from pytubefix.monostate import Monostate |
|
from pytubefix.exceptions import SABRError |
|
from pytubefix.sabr.video_streaming.sabr_error import SabrError |
|
from pytubefix.sabr.video_streaming.media_header import MediaHeader |
|
from pytubefix.sabr.core.chunked_data_buffer import ChunkedDataBuffer |
|
from pytubefix.sabr.video_streaming.sabr_redirect import SabrRedirect |
|
from pytubefix.sabr.video_streaming.playback_cookie import PlaybackCookie |
|
from pytubefix.sabr.video_streaming.next_request_policy import NextRequestPolicy |
|
from pytubefix.sabr.video_streaming.stream_protection_status import StreamProtectionStatus |
|
from pytubefix.sabr.video_streaming.video_playback_abr_request import VideoPlaybackAbrRequest |
|
from pytubefix.sabr.video_streaming.format_initialization_metadata import FormatInitializationMetadata |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class PART(Enum): |
|
ONESIE_HEADER = 10 |
|
ONESIE_DATA = 11 |
|
MEDIA_HEADER = 20 |
|
MEDIA = 21 |
|
MEDIA_END = 22 |
|
LIVE_METADATA = 31 |
|
HOSTNAME_CHANGE_HINT = 32 |
|
LIVE_METADATA_PROMISE = 33 |
|
LIVE_METADATA_PROMISE_CANCELLATION = 34 |
|
NEXT_REQUEST_POLICY = 35 |
|
USTREAMER_VIDEO_AND_FORMAT_DATA = 36 |
|
FORMAT_SELECTION_CONFIG = 37 |
|
USTREAMER_SELECTED_MEDIA_STREAM = 38 |
|
FORMAT_INITIALIZATION_METADATA = 42 |
|
SABR_REDIRECT = 43 |
|
SABR_ERROR = 44 |
|
SABR_SEEK = 45 |
|
RELOAD_PLAYER_RESPONSE = 46 |
|
PLAYBACK_START_POLICY = 47 |
|
ALLOWED_CACHED_FORMATS = 48 |
|
START_BW_SAMPLING_HINT = 49 |
|
PAUSE_BW_SAMPLING_HINT = 50 |
|
SELECTABLE_FORMATS = 51 |
|
REQUEST_IDENTIFIER = 52 |
|
REQUEST_CANCELLATION_POLICY = 53 |
|
ONESIE_PREFETCH_REJECTION = 54 |
|
TIMELINE_CONTEXT = 55 |
|
REQUEST_PIPELINING = 56 |
|
SABR_CONTEXT_UPDATE = 57 |
|
STREAM_PROTECTION_STATUS = 58 |
|
SABR_CONTEXT_SENDING_POLICY = 59 |
|
LAWNMOWER_POLICY = 60 |
|
SABR_ACK = 61 |
|
END_OF_TRACK = 62 |
|
CACHE_LOAD_POLICY = 63 |
|
LAWNMOWER_MESSAGING_POLICY = 64 |
|
PREWARM_CONNECTION = 65 |
|
PLAYBACK_DEBUG_INFO = 66 |
|
SNACKBAR_MESSAGE = 67 |
|
|
|
|
|
class ServerAbrStream: |
|
def __init__(self, stream, write_chunk: Callable, monostate: Monostate): |
|
|
|
self.stream = stream |
|
self.write_chunk = write_chunk |
|
self.youtube = monostate.youtube |
|
self.po_token = self.stream.po_token |
|
self.server_abr_streaming_url = self.stream.url |
|
self.video_playback_ustreamer_config = self.stream.video_playback_ustreamer_config |
|
self.totalDurationMs = int(self.stream.durationMs) |
|
self.bytes_remaining = self.stream.filesize |
|
self.initialized_formats = [] |
|
self.formats_by_key = {} |
|
self.playback_cookie = None |
|
self.header_id_to_format_key_map = {} |
|
self.previous_sequences = {} |
|
self.RELOAD = False |
|
self.maximum_reload_attempt = 3 |
|
|
|
|
|
def emit(self, data): |
|
for formatId in data['initialized_formats']: |
|
if formatId['formatId']['itag'] == self.stream.itag: |
|
media_chunks = formatId['mediaChunks'] |
|
for chunk in media_chunks: |
|
self.bytes_remaining -= len(chunk) |
|
self.write_chunk(chunk, self.bytes_remaining) |
|
|
|
def start(self): |
|
|
|
audio_format = [{'itag': self.stream.itag, |
|
'lastModified': int(self.stream.last_Modified), |
|
'xtags': self.stream.xtags}] if self.stream.type == 'audio' else [] |
|
|
|
|
|
video_format = [{'itag': self.stream.itag, |
|
'lastModified': int(self.stream.last_Modified), |
|
'xtags': self.stream.xtags}] if self.stream.type == 'video' else [] |
|
|
|
client_abr_state = { |
|
'lastManualDirection': 0, |
|
'timeSinceLastManualFormatSelectionMs': 0, |
|
'lastManualSelectedResolution': int(self.stream.resolution.replace('p', '')) if video_format else 720, |
|
'stickyResolution': int(self.stream.resolution.replace('p', '')) if video_format else 720, |
|
'playerTimeMs': 0, |
|
'visibility': 0, |
|
'drcEnabled': self.stream.is_drc, |
|
|
|
'enabledTrackTypesBitfield': 0 if video_format else 1 |
|
} |
|
while client_abr_state['playerTimeMs'] < self.totalDurationMs and self.maximum_reload_attempt > 0: |
|
data = self.fetch_media(client_abr_state, audio_format, video_format) |
|
|
|
if data.get("sabr_error", None): |
|
logger.debug(data.get('sabr_error').type) |
|
self.reload() |
|
|
|
self.emit(data) |
|
|
|
if client_abr_state['enabledTrackTypesBitfield'] == 0: |
|
main_format = next((fmt for fmt in data['initialized_formats'] if "video" in (fmt.get("mimeType") or "")), |
|
None) |
|
else: |
|
main_format = data['initialized_formats'][0] if data['initialized_formats'] else None |
|
|
|
for fmt in data['initialized_formats']: |
|
format_key = fmt["formatKey"] |
|
sequence_numbers = [seq.get("sequenceNumber", 0) for seq in fmt["sequenceList"]] |
|
self.previous_sequences[format_key] = sequence_numbers |
|
|
|
|
|
if ( not self.RELOAD and ( |
|
main_format is None or |
|
("sequenceList" in main_format and not main_format.get("sequenceList", None)) |
|
) |
|
): |
|
logger.debug("The Abr server did not return any chunks") |
|
self.reload() |
|
|
|
if self.maximum_reload_attempt > 0 and self.RELOAD: |
|
self.RELOAD = False |
|
continue |
|
elif self.maximum_reload_attempt <= 0: |
|
raise SABRError("SABR Maximum reload attempts reached") |
|
|
|
if ( |
|
not main_format or |
|
main_format["sequenceCount"] == main_format["sequenceList"][-1].get("sequenceNumber") |
|
): |
|
break |
|
|
|
total_sequence_duration = sum(seq.get("durationMs", 0) for seq in main_format["sequenceList"]) |
|
client_abr_state['playerTimeMs'] += total_sequence_duration |
|
|
|
def fetch_media(self, client_abr_state, audio_format, video_format): |
|
body = VideoPlaybackAbrRequest.encode({ |
|
'clientAbrState': client_abr_state, |
|
'selectedAudioFormatIds': audio_format, |
|
'selectedVideoFormatIds': video_format, |
|
'selectedFormatIds': [fmt["formatId"] for fmt in self.initialized_formats], |
|
'videoPlaybackUstreamerConfig': self.base64_to_u8(self.video_playback_ustreamer_config), |
|
'streamerContext': { |
|
'field5': [], |
|
'field6': [], |
|
'poToken': self.base64_to_u8(self.po_token) if self.po_token else None, |
|
'playbackCookie': PlaybackCookie.encode(self.playback_cookie).finish() if self.playback_cookie else None, |
|
'clientInfo': { |
|
'clientName': 1, |
|
'clientVersion': '2.20250523.01.00', |
|
'osName': 'Windows', |
|
'osVersion': '10.0', |
|
'platform': 'DESKTOP' |
|
} |
|
}, |
|
'bufferedRanges': [fmt["_state"] for fmt in self.initialized_formats], |
|
'field1000': [] |
|
}).finish() |
|
|
|
base_headers = { |
|
"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en", "Content-Type": "application/vnd.yt-ump", |
|
} |
|
request = Request(self.server_abr_streaming_url, headers=base_headers, method="POST", data=bytes(body)) |
|
return self.parse_ump_response(bytes(urlopen(request).read())) |
|
|
|
def parse_ump_response(self, response): |
|
self.header_id_to_format_key_map.clear() |
|
for k, v in enumerate(self.initialized_formats): |
|
self.initialized_formats[k]['sequenceList'] = [] |
|
self.initialized_formats[k]['mediaChunks'] = [] |
|
|
|
sabr_error: Optional[SabrError] = None |
|
sabr_redirect: Optional[SabrRedirect] = None |
|
stream_protection_status: Optional[StreamProtectionStatus] = None |
|
|
|
ump = UMP(ChunkedDataBuffer([response])) |
|
|
|
def callback(part): |
|
data = list(part['data'].chunks[0] if part['data'].chunks else []) |
|
|
|
if part['type'] == PART.MEDIA_HEADER.value: |
|
self.process_media_header(data) |
|
|
|
elif part['type'] == PART.MEDIA.value: |
|
self.process_media_data(part['data']) |
|
|
|
elif part['type'] == PART.MEDIA_END.value: |
|
self.process_end_of_media(part['data']) |
|
|
|
elif part['type'] == PART.NEXT_REQUEST_POLICY.value: |
|
self.process_next_request_policy(data) |
|
|
|
elif part['type'] == PART.FORMAT_INITIALIZATION_METADATA.value: |
|
self.process_format_initialization(data) |
|
|
|
elif part['type'] == PART.SABR_ERROR.value: |
|
nonlocal sabr_error |
|
sabr_error = SabrError.decode(data) |
|
|
|
elif part['type'] == PART.SABR_REDIRECT.value: |
|
nonlocal sabr_redirect |
|
sabr_redirect = self.process_sabr_redirect(data) |
|
logger.debug("SABR_REDIRECT") |
|
|
|
elif part['type'] == PART.STREAM_PROTECTION_STATUS.value: |
|
nonlocal stream_protection_status |
|
stream_protection_status = StreamProtectionStatus.decode(data) |
|
|
|
elif part['type'] == PART.RELOAD_PLAYER_RESPONSE.value: |
|
logger.debug("RELOAD_PLAYER_RESPONSE") |
|
self.reload() |
|
|
|
elif part["type"] == PART.PLAYBACK_START_POLICY.value: |
|
pass |
|
|
|
elif part["type"] == PART.REQUEST_CANCELLATION_POLICY.value: |
|
pass |
|
|
|
elif part["type"] == PART.SABR_CONTEXT_UPDATE.value: |
|
|
|
logger.debug("SABR_CONTEXT_UPDATE") |
|
... |
|
|
|
ump.parse(callback) |
|
|
|
return { |
|
"initialized_formats": self.initialized_formats, |
|
"stream_protection_status": stream_protection_status, |
|
"sabr_redirect": sabr_redirect, |
|
"sabr_error": sabr_error |
|
} |
|
|
|
def process_media_header(self, data): |
|
media_header = MediaHeader.decode(data) |
|
|
|
if not media_header.formatId: |
|
return |
|
|
|
format_key = self.get_format_key(media_header.formatId) |
|
current_format = self.formats_by_key.get(format_key) or self.register_format(media_header) |
|
|
|
if not current_format: |
|
return |
|
|
|
sequence_number = media_header.sequenceNumber |
|
if sequence_number is not None: |
|
if format_key in self.previous_sequences: |
|
if sequence_number in self.previous_sequences[format_key]: |
|
return |
|
|
|
header_id = media_header.headerId |
|
if header_id is not None: |
|
if header_id not in self.header_id_to_format_key_map: |
|
self.header_id_to_format_key_map[header_id] = format_key |
|
|
|
if not any(seq.get("sequenceNumber") == (media_header.sequenceNumber or 0) for seq in |
|
current_format["sequenceList"]): |
|
current_format["sequenceList"].append({ |
|
"itag": media_header.itag, |
|
"formatId": media_header.formatId, |
|
"isInitSegment": media_header.isInitSeg, |
|
"durationMs": media_header.durationMs, |
|
"startMs": media_header.startMs, |
|
"startDataRange": media_header.startRange, |
|
"sequenceNumber": media_header.sequenceNumber, |
|
"contentLength": media_header.contentLength, |
|
"timeRange": media_header.timeRange |
|
}) |
|
|
|
if isinstance(sequence_number, int): |
|
current_format["_state"]["durationMs"] += media_header.durationMs |
|
current_format["_state"]["endSegmentIndex"] += 1 |
|
|
|
def process_media_data(self, data): |
|
header_id = data.get_uint8(0) |
|
stream_data = data.split(1)['remaining_buffer'] |
|
format_key = self.header_id_to_format_key_map.get(header_id) |
|
if not format_key: |
|
return |
|
|
|
current_format = self.formats_by_key.get(format_key) |
|
if not current_format: |
|
return |
|
|
|
current_format['mediaChunks'].append(stream_data.chunks[0]) |
|
|
|
def process_end_of_media(self, data): |
|
header_id = data.get_uint8(0) |
|
self.header_id_to_format_key_map.pop(header_id, None) |
|
|
|
def process_next_request_policy(self, data): |
|
next_request_policy = NextRequestPolicy.decode(data) |
|
self.playback_cookie = next_request_policy.playbackCookie |
|
|
|
def process_format_initialization(self, data): |
|
format_metadata = FormatInitializationMetadata.decode(data) |
|
self.register_format(format_metadata) |
|
|
|
def process_sabr_redirect(self, data): |
|
sabr_redirect = SabrRedirect.decode(data) |
|
if not sabr_redirect.url: |
|
raise ValueError("Invalid SABR redirect") |
|
self.server_abr_streaming_url = sabr_redirect.url |
|
return sabr_redirect |
|
|
|
@staticmethod |
|
def get_format_key(format_id) -> str: |
|
return f"{format_id['itag']};{format_id['lastModified']};" |
|
|
|
def register_format(self, data): |
|
if data.formatId is None: |
|
return None |
|
|
|
format_key = self.get_format_key(data.formatId) |
|
|
|
if format_key not in self.formats_by_key: |
|
format_ = { |
|
"formatId": data.formatId, |
|
"formatKey": format_key, |
|
"durationMs": data.durationMs, |
|
"mimeType": data.mimeType, |
|
"sequenceCount": data.endSegmentNumber, |
|
"sequenceList": [], |
|
"mediaChunks": [], |
|
"_state": { |
|
"formatId": data.formatId, |
|
"startTimeMs": 0, |
|
"durationMs": 0, |
|
"startSegmentIndex": 1, |
|
"endSegmentIndex": 0 |
|
} |
|
} |
|
self.initialized_formats.append(format_) |
|
self.formats_by_key[format_key] = self.initialized_formats[-1] |
|
return format_ |
|
return None |
|
|
|
def reload(self): |
|
logger.debug("Refreshing SABR streaming URL") |
|
|
|
self.RELOAD = True |
|
self.maximum_reload_attempt -= 1 |
|
|
|
self.youtube.vid_info = None |
|
refresh_url = self.youtube.server_abr_streaming_url |
|
if not refresh_url: |
|
raise ValueError("Invalid SABR refresh") |
|
self.server_abr_streaming_url = refresh_url |
|
self.video_playback_ustreamer_config = self.youtube.video_playback_ustreamer_config |
|
|
|
|
|
@staticmethod |
|
def base64_to_u8(base64_str): |
|
standard_base64 = base64_str.replace('-', '+').replace('_', '/') |
|
padded_base64 = standard_base64 + '=' * ((4 - len(standard_base64) % 4) % 4) |
|
byte_data = base64.b64decode(padded_base64) |
|
return bytearray(byte_data) |
|
|