|
|
|
|
|
from typing import List, Optional |
|
|
|
from pytubefix.sabr.common import FormatId |
|
from pytubefix.sabr.proto import BinaryWriter, BinaryReader |
|
from pytubefix.sabr.video_streaming.buffered_range import BufferedRange |
|
from pytubefix.sabr.video_streaming.client_abr_state import ClientAbrState |
|
from pytubefix.sabr.video_streaming.streamer_context import StreamerContext |
|
|
|
|
|
class VideoPlaybackAbrRequest: |
|
def __init__(self): |
|
self.client_abr_state: Optional[ClientAbrState] = None |
|
self.selected_format_ids: List[FormatId] = [] |
|
self.buffered_ranges: List[BufferedRange] = [] |
|
self.player_time_ms: int = 0 |
|
self.video_playback_ustreamer_config: bytes = bytes() |
|
self.lo = None |
|
self.lj = None |
|
self.selected_audio_format_ids: List[FormatId] = [] |
|
self.selected_video_format_ids: List[FormatId] = [] |
|
self.streamer_context: Optional[StreamerContext] = None |
|
self.field1 = None |
|
self.field2 = None |
|
self.field3 = None |
|
self.field21: Optional[OQa] = None |
|
self.field22: int = 0 |
|
self.field23: int = 0 |
|
self.field1000: List[Pqa] = [] |
|
|
|
@staticmethod |
|
def encode(message: dict, writer=None): |
|
if writer is None: |
|
writer = BinaryWriter() |
|
|
|
if "clientAbrState" in message and message["clientAbrState"] is not None: |
|
writer.uint32(10) |
|
ClientAbrState.encode(message["clientAbrState"], writer.fork()) |
|
writer.join() |
|
|
|
for v in message.get("selectedFormatIds", []): |
|
writer.uint32(18) |
|
FormatId.encode(v, writer.fork()) |
|
writer.join() |
|
|
|
for v in message.get("bufferedRanges", []): |
|
writer.uint32(26) |
|
BufferedRange.encode(v, writer.fork()) |
|
writer.join() |
|
|
|
if message.get("playerTimeMs", 0): |
|
writer.uint32(32).int64(message["playerTimeMs"]) |
|
|
|
if message.get("videoPlaybackUstreamerConfig", b''): |
|
writer.uint32(42).bytes(message["videoPlaybackUstreamerConfig"]) |
|
|
|
if "lo" in message and message["lo"] is not None: |
|
writer.uint32(50) |
|
Lo.encode(message["lo"], writer.fork()) |
|
writer.join() |
|
|
|
for v in message.get("selectedAudioFormatIds", []): |
|
writer.uint32(130) |
|
FormatId.encode(v, writer.fork()) |
|
writer.join() |
|
|
|
for v in message.get("selectedVideoFormatIds", []): |
|
writer.uint32(138) |
|
FormatId.encode(v, writer.fork()) |
|
writer.join() |
|
|
|
if "streamerContext" in message and message["streamerContext"] is not None: |
|
writer.uint32(154) |
|
StreamerContext.encode(message["streamerContext"], writer.fork()) |
|
writer.join() |
|
|
|
if "field21" in message and message["field21"] is not None: |
|
writer.uint32(170) |
|
OQa.encode(message["field21"], writer.fork()) |
|
writer.join() |
|
|
|
if message.get("field22", 0): |
|
writer.uint32(176).int32(message["field22"]) |
|
|
|
if message.get("field23", 0): |
|
writer.uint32(184).int32(message["field23"]) |
|
|
|
for v in message.get("field1000", []): |
|
writer.uint32(8002) |
|
Pqa.encode(v, writer.fork()) |
|
writer.join() |
|
|
|
return writer |
|
|
|
@staticmethod |
|
def decode(input_data, length=None): |
|
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data) |
|
end = reader.len if length is None else reader.pos + length |
|
message = VideoPlaybackAbrRequest() |
|
|
|
while reader.pos < end: |
|
tag = reader.uint32() |
|
field_number = tag >> 3 |
|
|
|
if field_number == 1 and tag == 10: |
|
message.client_abr_state = ClientAbrState.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 2 and tag == 18: |
|
message.selected_format_ids.append(FormatId.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 3 and tag == 26: |
|
message.buffered_ranges.append(BufferedRange.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 4 and tag == 32: |
|
message.player_time_ms = long_to_number(reader.int64()) |
|
continue |
|
elif field_number == 5 and tag == 42: |
|
message.video_playback_ustreamer_config = reader.bytes() |
|
continue |
|
elif field_number == 6 and tag == 50: |
|
message.lo = Lo.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 16 and tag == 130: |
|
message.selected_audio_format_ids.append(FormatId.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 17 and tag == 138: |
|
message.selected_video_format_ids.append(FormatId.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 19 and tag == 154: |
|
message.streamer_context = StreamerContext.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 21 and tag == 170: |
|
message.field21 = OQa.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 22 and tag == 176: |
|
message.field22 = reader.int32() |
|
continue |
|
elif field_number == 23 and tag == 184: |
|
message.field23 = reader.int32() |
|
continue |
|
elif field_number == 1000 and tag == 8002: |
|
message.field1000.append(Pqa.decode(reader, reader.uint32())) |
|
continue |
|
|
|
|
|
return message |
|
|
|
|
|
class Lo: |
|
def __init__(self): |
|
self.format_id: Optional[FormatId] = None |
|
self.Lj: int = 0 |
|
self.sequence_number: int = 0 |
|
self.field4: Optional[Lo_Field4] = None |
|
self.MZ: int = 0 |
|
|
|
@staticmethod |
|
def encode(message: dict, writer=None): |
|
if writer is None: |
|
writer = BinaryWriter() |
|
|
|
for v in message.get("field1", []): |
|
writer.uint32(10).string(v) |
|
|
|
if "field2" in message and message["field2"] is not None: |
|
writer.uint32(18).int32(message["field2"]) |
|
|
|
if "field3" in message and message["field3"] is not None: |
|
writer.uint32(26).int32(message["field3"]) |
|
|
|
if "field4" in message and message["field4"] is not None: |
|
writer.uint32(32).int32(message["field4"]) |
|
|
|
if "field5" in message and message["field5"] is not None: |
|
writer.uint32(40).int32(message["field5"]) |
|
|
|
if "field6" in message and message["field6"] is not None: |
|
writer.uint32(50).int32(message["field6"]) |
|
|
|
return writer |
|
|
|
@staticmethod |
|
def decode(input_data, length=None): |
|
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data) |
|
end = reader.len if length is None else reader.pos + length |
|
message = Lo() |
|
|
|
while reader.pos < end: |
|
tag = reader.uint32() |
|
field_number = tag >> 3 |
|
|
|
if field_number == 1 and tag == 10: |
|
message.format_id = FormatId.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 2 and tag == 16: |
|
message.Lj = reader.int32() |
|
continue |
|
elif field_number == 3 and tag == 24: |
|
message.sequenceNumber = reader.int32() |
|
continue |
|
elif field_number == 4 and tag == 34: |
|
message.field4 = Lo_Field4.decode(reader, reader.uint32()) |
|
continue |
|
elif field_number == 5 and tag == 40: |
|
message.MZ = reader.int32() |
|
continue |
|
|
|
return message |
|
|
|
|
|
class Lo_Field4: |
|
def __init__(self): |
|
self.field1: int = 0 |
|
self.field2: int = 0 |
|
self.field3: int = 0 |
|
|
|
@staticmethod |
|
def encode(message: dict, writer=None): |
|
if writer is None: |
|
writer = BinaryWriter() |
|
|
|
if "field1" in message and message["field1"] is not None: |
|
writer.uint32(8).int32(message["field1"]) |
|
|
|
if "field2" in message and message["field2"] is not None: |
|
writer.uint32(16).int32(message["field2"]) |
|
|
|
if "field3" in message and message["field3"] is not None: |
|
writer.uint32(24).int32(message["field3"]) |
|
|
|
return writer |
|
|
|
@staticmethod |
|
def decode(input_data, length=None): |
|
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data) |
|
end = reader.len if length is None else reader.pos + length |
|
message = Lo_Field4() |
|
|
|
while reader.pos < end: |
|
tag = reader.uint32() |
|
field_number = tag >> 3 |
|
|
|
if field_number == 1 and tag == 8: |
|
message.field1 = reader.int32() |
|
continue |
|
elif field_number == 2 and tag == 16: |
|
message.field2 = reader.int32() |
|
continue |
|
elif field_number == 3 and tag == 24: |
|
message.field3 = reader.int32() |
|
continue |
|
|
|
return message |
|
|
|
|
|
class OQa: |
|
def __init__(self): |
|
self.field1: List[str] = [] |
|
self.field2: bytes = bytes() |
|
self.field3: str = "" |
|
self.field4: int = 0 |
|
self.field5: int = 0 |
|
self.field6: str = "" |
|
|
|
@staticmethod |
|
def encode(message: dict, writer=None): |
|
if writer is None: |
|
writer = BinaryWriter() |
|
|
|
if "field1" in message and message["field1"] is not None: |
|
writer.uint32(8).int32(message["field1"]) |
|
|
|
if "field2" in message and message["field2"] is not None: |
|
writer.uint32(16).int32(message["field2"]) |
|
|
|
if "field3" in message and message["field3"] is not None: |
|
writer.uint32(24).int32(message["field3"]) |
|
|
|
return writer |
|
|
|
@staticmethod |
|
def decode(input_data, length=None): |
|
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data) |
|
end = reader.len if length is None else reader.pos + length |
|
message = OQa() |
|
|
|
while reader.pos < end: |
|
tag = reader.uint32() |
|
field_number = tag >> 3 |
|
|
|
if field_number == 1 and tag == 10: |
|
message.field1.append(reader.string()) |
|
continue |
|
elif field_number == 2 and tag == 18: |
|
message.field2 = reader.bytes() |
|
continue |
|
elif field_number == 3 and tag == 26: |
|
message.field3 = reader.string() |
|
continue |
|
elif field_number == 4 and tag == 32: |
|
message.field4 = reader.int32() |
|
continue |
|
elif field_number == 5 and tag == 40: |
|
message.field5 = reader.int32() |
|
continue |
|
elif field_number == 6 and tag == 50: |
|
message.field6 = reader.string() |
|
continue |
|
|
|
return message |
|
|
|
|
|
class Pqa: |
|
def __init__(self): |
|
self.formats: List[FormatId] = [] |
|
self.ud: List[BufferedRange] = [] |
|
self.clip_id: str = "" |
|
|
|
@staticmethod |
|
def encode(message: dict, writer=None): |
|
if writer is None: |
|
writer = BinaryWriter() |
|
|
|
for v in message.get("formats", []): |
|
FormatId.encode(v, writer.uint32(10).fork()).join() |
|
|
|
for v in message.get("ud", []): |
|
BufferedRange.encode(v, writer.uint32(18).fork()).join() |
|
|
|
if "clipId" in message and message["clipId"] is not None: |
|
writer.uint32(26).int32(message["clipId"]) |
|
|
|
return writer |
|
|
|
@staticmethod |
|
def decode(input_data, length=None): |
|
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data) |
|
message = Pqa() |
|
end = reader.len if length is None else reader.pos + length |
|
|
|
while reader.pos < end: |
|
tag = reader.uint32() |
|
field_number = tag >> 3 |
|
|
|
if field_number == 1 and tag == 10: |
|
message.formats.append(FormatId.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 2 and tag == 18: |
|
message.ud.append(BufferedRange.decode(reader, reader.uint32())) |
|
continue |
|
elif field_number == 3 and tag == 26: |
|
message.clip_id = reader.string() |
|
continue |
|
|
|
return message |
|
|
|
def long_to_number(int64_value): |
|
value = int(str(int64_value)) |
|
if value > (2 ** 53 - 1): |
|
raise OverflowError("Value is larger than 9007199254740991") |
|
if value < -(2 ** 53 - 1): |
|
raise OverflowError("Value is smaller than -9007199254740991") |
|
return value |