|
|
|
|
|
class UMP: |
|
def __init__(self, chunked_data_buffer): |
|
""" |
|
Creates a new UMP parser. |
|
:param chunked_data_buffer: Instance of a buffer containing data in UMP format. |
|
""" |
|
self.chunked_data_buffer = chunked_data_buffer |
|
|
|
def parse(self, handle_part): |
|
""" |
|
Parses parts of the buffer and calls the handler for each complete part. |
|
:param handle_part: Function called with each part complete. |
|
:return: Partial if parsing is incomplete, otherwise None. |
|
""" |
|
while True: |
|
offset = 0 |
|
part_type, new_offset = self.read_varint(offset) |
|
offset = new_offset |
|
part_size, final_offset = self.read_varint(offset) |
|
offset = final_offset |
|
|
|
if part_type < 0 or part_size < 0: |
|
break |
|
|
|
if not self.chunked_data_buffer.can_read_bytes(offset, part_size): |
|
if not self.chunked_data_buffer.can_read_bytes(offset, 1): |
|
break |
|
return { |
|
"type": part_type, |
|
"size": part_size, |
|
"data": self.chunked_data_buffer |
|
} |
|
|
|
split_result = self.chunked_data_buffer.split(offset)['remaining_buffer'].split(part_size) |
|
offset = 0 |
|
handle_part({ |
|
"type": part_type, |
|
"size": part_size, |
|
"data": split_result['extracted_buffer'] |
|
}) |
|
self.chunked_data_buffer = split_result['remaining_buffer'] |
|
|
|
def read_varint(self, offset): |
|
""" |
|
Reads a variable length integer from the buffer. |
|
:param offset: Initial reading position. |
|
:return: Tuple (value, new offset), or (-1, offset) if incomplete. |
|
""" |
|
if self.chunked_data_buffer.can_read_bytes(offset, 1): |
|
first_byte = self.chunked_data_buffer.get_uint8(offset) |
|
if first_byte < 128: |
|
byte_length = 1 |
|
elif first_byte < 192: |
|
byte_length = 2 |
|
elif first_byte < 224: |
|
byte_length = 3 |
|
elif first_byte < 240: |
|
byte_length = 4 |
|
else: |
|
byte_length = 5 |
|
else: |
|
byte_length = 0 |
|
|
|
if byte_length < 1 or not self.chunked_data_buffer.can_read_bytes(offset, byte_length): |
|
return -1, offset |
|
|
|
if byte_length == 1: |
|
value = self.chunked_data_buffer.get_uint8(offset) |
|
offset += 1 |
|
elif byte_length == 2: |
|
byte1 = self.chunked_data_buffer.get_uint8(offset) |
|
byte2 = self.chunked_data_buffer.get_uint8(offset + 1) |
|
value = (byte1 & 0x3F) + 64 * byte2 |
|
offset += 2 |
|
elif byte_length == 3: |
|
byte1 = self.chunked_data_buffer.get_uint8(offset) |
|
byte2 = self.chunked_data_buffer.get_uint8(offset + 1) |
|
byte3 = self.chunked_data_buffer.get_uint8(offset + 2) |
|
value = (byte1 & 0x1F) + 32 * (byte2 + 256 * byte3) |
|
offset += 3 |
|
elif byte_length == 4: |
|
byte1 = self.chunked_data_buffer.get_uint8(offset) |
|
byte2 = self.chunked_data_buffer.get_uint8(offset + 1) |
|
byte3 = self.chunked_data_buffer.get_uint8(offset + 2) |
|
byte4 = self.chunked_data_buffer.get_uint8(offset + 3) |
|
value = (byte1 & 0x0F) + 16 * (byte2 + 256 * (byte3 + 256 * byte4)) |
|
offset += 4 |
|
else: |
|
temp_offset = offset + 1 |
|
self.chunked_data_buffer.focus(temp_offset) |
|
|
|
if self.can_read_from_current_chunk(temp_offset, 4): |
|
view = self.get_current_data_view() |
|
offset_in_chunk = temp_offset - self.chunked_data_buffer.current_chunk_offset |
|
value = int.from_bytes( |
|
view[offset_in_chunk:offset_in_chunk + 4], |
|
byteorder='little' |
|
) |
|
else: |
|
byte3 = ( |
|
self.chunked_data_buffer.get_uint8(temp_offset + 2) + |
|
256 * self.chunked_data_buffer.get_uint8(temp_offset + 3) |
|
) |
|
value = ( |
|
self.chunked_data_buffer.get_uint8(temp_offset) + |
|
256 * ( |
|
self.chunked_data_buffer.get_uint8(temp_offset + 1) + |
|
256 * byte3 |
|
) |
|
) |
|
offset += 5 |
|
|
|
return value, offset |
|
|
|
def can_read_from_current_chunk(self, offset, length): |
|
""" |
|
Checks whether the specified number of bytes can be read from the current chunk. |
|
""" |
|
index = self.chunked_data_buffer.current_chunk_index |
|
current_chunk = self.chunked_data_buffer.chunks[index] |
|
return ( |
|
offset - self.chunked_data_buffer.current_chunk_offset + length |
|
<= len(current_chunk) |
|
) |
|
|
|
def get_current_data_view(self): |
|
""" |
|
Gets a binary view (DataView) of the current chunk. |
|
""" |
|
if self.chunked_data_buffer.current_data_view is None: |
|
chunk = self.chunked_data_buffer.chunks[self.chunked_data_buffer.current_chunk_index] |
|
self.chunked_data_buffer.current_data_view = memoryview(chunk) |
|
return self.chunked_data_buffer.current_data_view |
|
|