ytmp4 / pytubefix /sabr /video_streaming /buffered_range.py
chipling's picture
Upload 106 files
44bafb2 verified
# All credits to https://github.com/LuanRT/googlevideo
from pytubefix.sabr.common import FormatId
from pytubefix.sabr.proto import BinaryWriter, BinaryReader
from pytubefix.sabr.video_streaming.time_range import TimeRange
def create_base_buffered_range():
return {
"formatId": None,
"startTimeMs": 0,
"durationMs": 0,
"startSegmentIndex": 0,
"endSegmentIndex": 0,
"timeRange": None,
"field9": None,
"field11": None,
"field12": None
}
class Kob_Pa:
def __init__(self):
self.videoId = ""
self.lmt = 0
@staticmethod
def encode(message, writer=None):
writer = writer or BinaryWriter()
if message.get("videoId"):
writer.uint32(10).string(message["videoId"])
if message.get("lmt", 0) != 0:
writer.uint32(16).uint64(message["lmt"])
return writer
@staticmethod
def decode(input_data, length=None):
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data)
end = reader.len if length is None else reader.pos + length
message = Kob_Pa()
while reader.pos < end:
tag = reader.uint32()
field_num = tag >> 3
if field_num == 1 and tag == 10:
message.videoId = reader.string()
continue
elif field_num == 2 and tag == 16:
message.lmt = long_to_number(reader.uint64())
elif (tag & 7) == 4 or tag == 0:
break
else:
reader.skip(tag & 7)
return message
class Kob:
def __init__(self):
self.EW = []
@staticmethod
def encode(message, writer=None):
writer = writer or BinaryWriter()
for v in message.get("EW", []):
Kob_Pa.encode(v, writer.uint32(10).fork()).join()
return writer
@staticmethod
def decode(input_data, length=None):
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data)
end = reader.len if length is None else reader.pos + length
message = Kob()
while reader.pos < end:
tag = reader.uint32()
field_num = tag >> 3
if field_num == 1 and tag == 10:
message.EW.append(Kob_Pa.decode(reader, reader.uint32()))
continue
elif (tag & 7) == 4 or tag == 0:
break
else:
reader.skip(tag & 7)
return message
class YPa:
def __init__(self):
self.field1 = 0
self.field2 = 0
self.field3 = 0
@staticmethod
def encode(message, writer=None):
writer = writer or BinaryWriter()
if message.get("field1", 0) != 0:
writer.uint32(8).int32(message["field1"])
if message.get("field2", 0) != 0:
writer.uint32(16).int32(message["field2"])
if message.get("field3", 0) != 0:
writer.uint32(24).int32(message["field3"])
return writer
@staticmethod
def decode(input_data, length=None):
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data)
end = reader.len if length is None else reader.pos + length
message = YPa()
while reader.pos < end:
tag = reader.uint32()
field_num = tag >> 3
if field_num == 1 and tag == 8:
message.field1 = reader.int32()
continue
elif field_num == 2 and tag == 16:
message.field2 = reader.int32()
continue
elif field_num == 3 and tag == 24:
message.field3 = reader.int32()
continue
elif (tag & 7) == 4 or tag == 0:
break
else:
reader.skip(tag & 7)
return message
class BufferedRange:
@staticmethod
def encode(message, writer=None):
writer = writer or BinaryWriter()
if message.get("formatId") is not None:
FormatId.encode(message["formatId"], writer.uint32(10).fork()).join()
if message.get("startTimeMs", 0) != 0:
writer.uint32(16).int64(message["startTimeMs"])
if message.get("durationMs", 0) != 0:
writer.uint32(24).int64(message["durationMs"])
if message.get("startSegmentIndex", 0) != 0:
writer.uint32(32).int32(message["startSegmentIndex"])
if message.get("endSegmentIndex", 0) != 0:
writer.uint32(40).int32(message["endSegmentIndex"])
if message.get("timeRange") is not None:
TimeRange.encode(message["timeRange"], writer.uint32(50).fork()).join()
if message.get("field9") is not None:
Kob.encode(message["field9"], writer.uint32(74).fork()).join()
if message.get("field11") is not None:
YPa.encode(message["field11"], writer.uint32(90).fork()).join()
if message.get("field12") is not None:
YPa.encode(message["field12"], writer.uint32(98).fork()).join()
return writer
@staticmethod
def decode(input_data, length=None):
reader = input_data if isinstance(input_data, BinaryReader) else BinaryReader(input_data)
end = reader.len if length is None else reader.pos + length
message = create_base_buffered_range()
while reader.pos < end:
tag = reader.uint32()
field_num = tag >> 3
if field_num == 1 and tag == 10:
message["formatId"] = FormatId.decode(reader, reader.uint32())
continue
elif field_num == 2 and tag == 16:
message["startTimeMs"] = long_to_number(reader.int64())
continue
elif field_num == 3 and tag == 24:
message["durationMs"] = long_to_number(reader.int64())
continue
elif field_num == 4 and tag == 32:
message["startSegmentIndex"] = reader.int32()
continue
elif field_num == 5 and tag == 40:
message["endSegmentIndex"] = reader.int32()
continue
elif field_num == 6 and tag == 50:
message["timeRange"] = TimeRange.decode(reader, reader.uint32())
continue
elif field_num == 9 and tag == 74:
message["field9"] = Kob.decode(reader, reader.uint32())
continue
elif field_num == 11 and tag == 90:
message["field11"] = YPa.decode(reader, reader.uint32())
continue
elif field_num == 12 and tag == 98:
message["field12"] = YPa.decode(reader, reader.uint32())
continue
elif (tag & 7) == 4 or tag == 0:
break
else:
reader.skip(tag & 7)
return message
def long_to_number(int64_value):
value = int(str(int64_value))
if value > (2 ** 53 - 1):
raise OverflowError("Value is larger than 9007199254740991")
if value < -(2 ** 53 - 1):
raise OverflowError("Value is smaller than -9007199254740991")
return value