ytmp4 / pytubefix /sabr /core /chunked_data_buffer.py
chipling's picture
Upload 106 files
44bafb2 verified
# All credits to https://github.com/LuanRT/googlevideo
class ChunkedDataBuffer:
def __init__(self, chunks=None):
"""
Initializes a new ChunkedDataBuffer with the given chunks.
"""
self.chunks = []
self.current_chunk_index = 0
self.current_chunk_offset = 0
self.current_data_view = None
self.total_length = 0
chunks = chunks or []
for chunk in chunks:
self.append(chunk)
def get_length(self):
return self.total_length
def append(self, chunk):
"""
Adds a new chunk to the buffer, merging with the previous one if possible.
"""
if self.can_merge_with_last_chunk(chunk):
last_chunk = self.chunks[-1]
merged = bytearray(last_chunk)
merged.extend(chunk)
self.chunks[-1] = bytes(merged)
self.reset_focus()
else:
self.chunks.append(chunk)
self.total_length += len(chunk)
def split(self, position):
"""
Split the buffer at the specified position into two: extracted and remainder.
"""
extracted_buffer = ChunkedDataBuffer()
remaining_buffer = ChunkedDataBuffer()
remaining_pos = position
for chunk in self.chunks:
chunk_len = len(chunk)
if remaining_pos >= chunk_len:
extracted_buffer.append(chunk)
remaining_pos -= chunk_len
elif remaining_pos > 0:
extracted_buffer.append(chunk[:remaining_pos])
remaining_buffer.append(chunk[remaining_pos:])
remaining_pos = 0
else:
remaining_buffer.append(chunk)
return {
"extracted_buffer": extracted_buffer,
"remaining_buffer": remaining_buffer
}
def is_focused(self, position):
"""
Checks if the position is within the currently focused chunk.
"""
chunk = self.chunks[self.current_chunk_index]
return self.current_chunk_offset <= position < self.current_chunk_offset + len(chunk)
def focus(self, position):
"""
Moves the internal focus to the chunk containing the specified position.
"""
if not self.is_focused(position):
if position < self.current_chunk_offset:
self.reset_focus()
while (self.current_chunk_offset + len(self.chunks[self.current_chunk_index]) <= position and
self.current_chunk_index < len(self.chunks) - 1):
self.current_chunk_offset += len(self.chunks[self.current_chunk_index])
self.current_chunk_index += 1
self.current_data_view = None
def can_read_bytes(self, position, length):
"""
Checks whether `length` bytes can be read from `position`.
"""
return position + length <= self.total_length
def get_uint8(self, position):
"""
Returns a single byte (as int) from the specified position.
"""
self.focus(position)
chunk = self.chunks[self.current_chunk_index]
return chunk[position - self.current_chunk_offset]
def can_merge_with_last_chunk(self, chunk):
"""
Checks if the new chunk can be merged with the last one.
"""
if not self.chunks:
return False
last_chunk = self.chunks[-1]
return (
last_chunk is not None and
isinstance(last_chunk, (bytes, bytearray)) and
isinstance(chunk, (bytes, bytearray))
)
def reset_focus(self):
"""
Resets focus to the beginning of the buffer.
"""
self.current_data_view = None
self.current_chunk_index = 0
self.current_chunk_offset = 0