File size: 3,841 Bytes
44bafb2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# All credits to https://github.com/LuanRT/googlevideo

class ChunkedDataBuffer:
    def __init__(self, chunks=None):
        """
        Initializes a new ChunkedDataBuffer with the given chunks.
        """
        self.chunks = []
        self.current_chunk_index = 0
        self.current_chunk_offset = 0
        self.current_data_view = None
        self.total_length = 0

        chunks = chunks or []
        for chunk in chunks:
            self.append(chunk)

    def get_length(self):
        return self.total_length

    def append(self, chunk):
        """
        Adds a new chunk to the buffer, merging with the previous one if possible.
        """
        if self.can_merge_with_last_chunk(chunk):
            last_chunk = self.chunks[-1]
            merged = bytearray(last_chunk)
            merged.extend(chunk)
            self.chunks[-1] = bytes(merged)
            self.reset_focus()
        else:
            self.chunks.append(chunk)
        self.total_length += len(chunk)

    def split(self, position):
        """
        Split the buffer at the specified position into two: extracted and remainder.
        """
        extracted_buffer = ChunkedDataBuffer()
        remaining_buffer = ChunkedDataBuffer()
        remaining_pos = position

        for chunk in self.chunks:
            chunk_len = len(chunk)
            if remaining_pos >= chunk_len:
                extracted_buffer.append(chunk)
                remaining_pos -= chunk_len
            elif remaining_pos > 0:
                extracted_buffer.append(chunk[:remaining_pos])
                remaining_buffer.append(chunk[remaining_pos:])
                remaining_pos = 0
            else:
                remaining_buffer.append(chunk)

        return {
            "extracted_buffer": extracted_buffer,
            "remaining_buffer": remaining_buffer
        }

    def is_focused(self, position):
        """
        Checks if the position is within the currently focused chunk.
        """
        chunk = self.chunks[self.current_chunk_index]
        return self.current_chunk_offset <= position < self.current_chunk_offset + len(chunk)

    def focus(self, position):
        """
        Moves the internal focus to the chunk containing the specified position.
        """
        if not self.is_focused(position):
            if position < self.current_chunk_offset:
                self.reset_focus()
            while (self.current_chunk_offset + len(self.chunks[self.current_chunk_index]) <= position and
                   self.current_chunk_index < len(self.chunks) - 1):
                self.current_chunk_offset += len(self.chunks[self.current_chunk_index])
                self.current_chunk_index += 1
            self.current_data_view = None

    def can_read_bytes(self, position, length):
        """
        Checks whether `length` bytes can be read from `position`.
        """
        return position + length <= self.total_length

    def get_uint8(self, position):
        """
        Returns a single byte (as int) from the specified position.
        """
        self.focus(position)
        chunk = self.chunks[self.current_chunk_index]
        return chunk[position - self.current_chunk_offset]

    def can_merge_with_last_chunk(self, chunk):
        """
        Checks if the new chunk can be merged with the last one.
        """
        if not self.chunks:
            return False
        last_chunk = self.chunks[-1]
        return (
            last_chunk is not None and
            isinstance(last_chunk, (bytes, bytearray)) and
            isinstance(chunk, (bytes, bytearray))
        )

    def reset_focus(self):
        """
        Resets focus to the beginning of the buffer.
        """
        self.current_data_view = None
        self.current_chunk_index = 0
        self.current_chunk_offset = 0