# All credits to https://github.com/LuanRT/googlevideo import base64 import logging from enum import Enum from typing import Optional from collections.abc import Callable from urllib.request import Request, urlopen from pytubefix.sabr.core.UMP import UMP from pytubefix.monostate import Monostate from pytubefix.exceptions import SABRError from pytubefix.sabr.video_streaming.sabr_error import SabrError from pytubefix.sabr.video_streaming.media_header import MediaHeader from pytubefix.sabr.core.chunked_data_buffer import ChunkedDataBuffer from pytubefix.sabr.video_streaming.sabr_redirect import SabrRedirect from pytubefix.sabr.video_streaming.playback_cookie import PlaybackCookie from pytubefix.sabr.video_streaming.next_request_policy import NextRequestPolicy from pytubefix.sabr.video_streaming.stream_protection_status import StreamProtectionStatus from pytubefix.sabr.video_streaming.video_playback_abr_request import VideoPlaybackAbrRequest from pytubefix.sabr.video_streaming.format_initialization_metadata import FormatInitializationMetadata logger = logging.getLogger(__name__) # https://github.com/davidzeng0/innertube/blob/main/googlevideo/ump.md class PART(Enum): ONESIE_HEADER = 10 ONESIE_DATA = 11 MEDIA_HEADER = 20 MEDIA = 21 MEDIA_END = 22 LIVE_METADATA = 31 HOSTNAME_CHANGE_HINT = 32 LIVE_METADATA_PROMISE = 33 LIVE_METADATA_PROMISE_CANCELLATION = 34 NEXT_REQUEST_POLICY = 35 USTREAMER_VIDEO_AND_FORMAT_DATA = 36 FORMAT_SELECTION_CONFIG = 37 USTREAMER_SELECTED_MEDIA_STREAM = 38 FORMAT_INITIALIZATION_METADATA = 42 SABR_REDIRECT = 43 SABR_ERROR = 44 SABR_SEEK = 45 RELOAD_PLAYER_RESPONSE = 46 PLAYBACK_START_POLICY = 47 ALLOWED_CACHED_FORMATS = 48 START_BW_SAMPLING_HINT = 49 PAUSE_BW_SAMPLING_HINT = 50 SELECTABLE_FORMATS = 51 REQUEST_IDENTIFIER = 52 REQUEST_CANCELLATION_POLICY = 53 ONESIE_PREFETCH_REJECTION = 54 TIMELINE_CONTEXT = 55 REQUEST_PIPELINING = 56 SABR_CONTEXT_UPDATE = 57 STREAM_PROTECTION_STATUS = 58 SABR_CONTEXT_SENDING_POLICY = 59 LAWNMOWER_POLICY = 60 SABR_ACK = 61 END_OF_TRACK = 62 CACHE_LOAD_POLICY = 63 LAWNMOWER_MESSAGING_POLICY = 64 PREWARM_CONNECTION = 65 PLAYBACK_DEBUG_INFO = 66 SNACKBAR_MESSAGE = 67 class ServerAbrStream: def __init__(self, stream, write_chunk: Callable, monostate: Monostate): self.stream = stream self.write_chunk = write_chunk self.youtube = monostate.youtube self.po_token = self.stream.po_token self.server_abr_streaming_url = self.stream.url self.video_playback_ustreamer_config = self.stream.video_playback_ustreamer_config self.totalDurationMs = int(self.stream.durationMs) self.bytes_remaining = self.stream.filesize self.initialized_formats = [] self.formats_by_key = {} self.playback_cookie = None self.header_id_to_format_key_map = {} self.previous_sequences = {} self.RELOAD = False self.maximum_reload_attempt = 3 def emit(self, data): for formatId in data['initialized_formats']: if formatId['formatId']['itag'] == self.stream.itag: media_chunks = formatId['mediaChunks'] for chunk in media_chunks: self.bytes_remaining -= len(chunk) self.write_chunk(chunk, self.bytes_remaining) def start(self): audio_format = [{'itag': self.stream.itag, 'lastModified': int(self.stream.last_Modified), 'xtags': self.stream.xtags}] if self.stream.type == 'audio' else [] video_format = [{'itag': self.stream.itag, 'lastModified': int(self.stream.last_Modified), 'xtags': self.stream.xtags}] if self.stream.type == 'video' else [] client_abr_state = { 'lastManualDirection': 0, 'timeSinceLastManualFormatSelectionMs': 0, 'lastManualSelectedResolution': int(self.stream.resolution.replace('p', '')) if video_format else 720, 'stickyResolution': int(self.stream.resolution.replace('p', '')) if video_format else 720, 'playerTimeMs': 0, 'visibility': 0, 'drcEnabled': self.stream.is_drc, # 0 = BOTH, 1 = AUDIO (video-only is no longer supported by YouTube) 'enabledTrackTypesBitfield': 0 if video_format else 1 } while client_abr_state['playerTimeMs'] < self.totalDurationMs and self.maximum_reload_attempt > 0: data = self.fetch_media(client_abr_state, audio_format, video_format) if data.get("sabr_error", None): logger.debug(data.get('sabr_error').type) self.reload() self.emit(data) if client_abr_state['enabledTrackTypesBitfield'] == 0: main_format = next((fmt for fmt in data['initialized_formats'] if "video" in (fmt.get("mimeType") or "")), None) else: main_format = data['initialized_formats'][0] if data['initialized_formats'] else None for fmt in data['initialized_formats']: format_key = fmt["formatKey"] sequence_numbers = [seq.get("sequenceNumber", 0) for seq in fmt["sequenceList"]] self.previous_sequences[format_key] = sequence_numbers if ( not self.RELOAD and ( main_format is None or ("sequenceList" in main_format and not main_format.get("sequenceList", None)) ) ): logger.debug("The Abr server did not return any chunks") self.reload() if self.maximum_reload_attempt > 0 and self.RELOAD: self.RELOAD = False continue elif self.maximum_reload_attempt <= 0: raise SABRError("SABR Maximum reload attempts reached") if ( not main_format or main_format["sequenceCount"] == main_format["sequenceList"][-1].get("sequenceNumber") ): break total_sequence_duration = sum(seq.get("durationMs", 0) for seq in main_format["sequenceList"]) client_abr_state['playerTimeMs'] += total_sequence_duration def fetch_media(self, client_abr_state, audio_format, video_format): body = VideoPlaybackAbrRequest.encode({ 'clientAbrState': client_abr_state, 'selectedAudioFormatIds': audio_format, 'selectedVideoFormatIds': video_format, 'selectedFormatIds': [fmt["formatId"] for fmt in self.initialized_formats], 'videoPlaybackUstreamerConfig': self.base64_to_u8(self.video_playback_ustreamer_config), 'streamerContext': { 'field5': [], 'field6': [], 'poToken': self.base64_to_u8(self.po_token) if self.po_token else None, 'playbackCookie': PlaybackCookie.encode(self.playback_cookie).finish() if self.playback_cookie else None, 'clientInfo': { 'clientName': 1, 'clientVersion': '2.20250523.01.00', 'osName': 'Windows', 'osVersion': '10.0', 'platform': 'DESKTOP' } }, 'bufferedRanges': [fmt["_state"] for fmt in self.initialized_formats], 'field1000': [] }).finish() base_headers = { "User-Agent": "Mozilla/5.0", "accept-language": "en-US,en", "Content-Type": "application/vnd.yt-ump", } request = Request(self.server_abr_streaming_url, headers=base_headers, method="POST", data=bytes(body)) return self.parse_ump_response(bytes(urlopen(request).read())) def parse_ump_response(self, response): self.header_id_to_format_key_map.clear() for k, v in enumerate(self.initialized_formats): self.initialized_formats[k]['sequenceList'] = [] self.initialized_formats[k]['mediaChunks'] = [] sabr_error: Optional[SabrError] = None sabr_redirect: Optional[SabrRedirect] = None stream_protection_status: Optional[StreamProtectionStatus] = None ump = UMP(ChunkedDataBuffer([response])) def callback(part): data = list(part['data'].chunks[0] if part['data'].chunks else []) if part['type'] == PART.MEDIA_HEADER.value: self.process_media_header(data) elif part['type'] == PART.MEDIA.value: self.process_media_data(part['data']) elif part['type'] == PART.MEDIA_END.value: self.process_end_of_media(part['data']) elif part['type'] == PART.NEXT_REQUEST_POLICY.value: self.process_next_request_policy(data) elif part['type'] == PART.FORMAT_INITIALIZATION_METADATA.value: self.process_format_initialization(data) elif part['type'] == PART.SABR_ERROR.value: nonlocal sabr_error sabr_error = SabrError.decode(data) elif part['type'] == PART.SABR_REDIRECT.value: nonlocal sabr_redirect sabr_redirect = self.process_sabr_redirect(data) logger.debug("SABR_REDIRECT") elif part['type'] == PART.STREAM_PROTECTION_STATUS.value: nonlocal stream_protection_status stream_protection_status = StreamProtectionStatus.decode(data) elif part['type'] == PART.RELOAD_PLAYER_RESPONSE.value: logger.debug("RELOAD_PLAYER_RESPONSE") self.reload() elif part["type"] == PART.PLAYBACK_START_POLICY.value: pass elif part["type"] == PART.REQUEST_CANCELLATION_POLICY.value: pass elif part["type"] == PART.SABR_CONTEXT_UPDATE.value: # TODO: Find out how to implement this part logger.debug("SABR_CONTEXT_UPDATE") ... ump.parse(callback) return { "initialized_formats": self.initialized_formats, "stream_protection_status": stream_protection_status, "sabr_redirect": sabr_redirect, "sabr_error": sabr_error } def process_media_header(self, data): media_header = MediaHeader.decode(data) if not media_header.formatId: return format_key = self.get_format_key(media_header.formatId) current_format = self.formats_by_key.get(format_key) or self.register_format(media_header) if not current_format: return sequence_number = media_header.sequenceNumber if sequence_number is not None: if format_key in self.previous_sequences: if sequence_number in self.previous_sequences[format_key]: return header_id = media_header.headerId if header_id is not None: if header_id not in self.header_id_to_format_key_map: self.header_id_to_format_key_map[header_id] = format_key if not any(seq.get("sequenceNumber") == (media_header.sequenceNumber or 0) for seq in current_format["sequenceList"]): current_format["sequenceList"].append({ "itag": media_header.itag, "formatId": media_header.formatId, "isInitSegment": media_header.isInitSeg, "durationMs": media_header.durationMs, "startMs": media_header.startMs, "startDataRange": media_header.startRange, "sequenceNumber": media_header.sequenceNumber, "contentLength": media_header.contentLength, "timeRange": media_header.timeRange }) if isinstance(sequence_number, int): current_format["_state"]["durationMs"] += media_header.durationMs current_format["_state"]["endSegmentIndex"] += 1 def process_media_data(self, data): header_id = data.get_uint8(0) stream_data = data.split(1)['remaining_buffer'] format_key = self.header_id_to_format_key_map.get(header_id) if not format_key: return current_format = self.formats_by_key.get(format_key) if not current_format: return current_format['mediaChunks'].append(stream_data.chunks[0]) def process_end_of_media(self, data): header_id = data.get_uint8(0) self.header_id_to_format_key_map.pop(header_id, None) def process_next_request_policy(self, data): next_request_policy = NextRequestPolicy.decode(data) self.playback_cookie = next_request_policy.playbackCookie def process_format_initialization(self, data): format_metadata = FormatInitializationMetadata.decode(data) self.register_format(format_metadata) def process_sabr_redirect(self, data): sabr_redirect = SabrRedirect.decode(data) if not sabr_redirect.url: raise ValueError("Invalid SABR redirect") self.server_abr_streaming_url = sabr_redirect.url return sabr_redirect @staticmethod def get_format_key(format_id) -> str: return f"{format_id['itag']};{format_id['lastModified']};" def register_format(self, data): if data.formatId is None: return None format_key = self.get_format_key(data.formatId) if format_key not in self.formats_by_key: format_ = { "formatId": data.formatId, "formatKey": format_key, "durationMs": data.durationMs, "mimeType": data.mimeType, "sequenceCount": data.endSegmentNumber, "sequenceList": [], "mediaChunks": [], "_state": { "formatId": data.formatId, "startTimeMs": 0, "durationMs": 0, "startSegmentIndex": 1, "endSegmentIndex": 0 } } self.initialized_formats.append(format_) self.formats_by_key[format_key] = self.initialized_formats[-1] return format_ return None def reload(self): logger.debug("Refreshing SABR streaming URL") self.RELOAD = True self.maximum_reload_attempt -= 1 self.youtube.vid_info = None refresh_url = self.youtube.server_abr_streaming_url if not refresh_url: raise ValueError("Invalid SABR refresh") self.server_abr_streaming_url = refresh_url self.video_playback_ustreamer_config = self.youtube.video_playback_ustreamer_config @staticmethod def base64_to_u8(base64_str): standard_base64 = base64_str.replace('-', '+').replace('_', '/') padded_base64 = standard_base64 + '=' * ((4 - len(standard_base64) % 4) % 4) byte_data = base64.b64decode(padded_base64) return bytearray(byte_data)