Spaces:
Running
Running
File size: 4,934 Bytes
658460c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
from __future__ import annotations
import ctypes
import sys
from typing import Any
NOT_FOUND: object = object()
_MAX_FIELD_SEARCH_OFFSET = 50
if sys.maxsize > 2**32:
WORD_TYPE: type[ctypes.c_int32] | type[ctypes.c_int64] = ctypes.c_int64
WORD_N_BYTES = 8
else:
WORD_TYPE = ctypes.c_int32
WORD_N_BYTES = 4
class DeduperReloaderPatchingMixin:
@staticmethod
def infer_field_offset(
obj: object,
field: str,
) -> int:
field_value = getattr(obj, field, NOT_FOUND)
if field_value is NOT_FOUND:
return -1
obj_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(obj)).value
field_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(field_value)).value
if obj_addr is None or field_addr is None:
return -1
ret = -1
for offset in range(1, _MAX_FIELD_SEARCH_OFFSET):
if (
ctypes.cast(
obj_addr + WORD_N_BYTES * offset, ctypes.POINTER(WORD_TYPE)
).contents.value
== field_addr
):
ret = offset
break
return ret
@classmethod
def try_write_readonly_attr(
cls,
obj: object,
field: str,
new_value: object,
offset: int | None = None,
) -> None:
prev_value = getattr(obj, field, NOT_FOUND)
if prev_value is NOT_FOUND:
return
if offset is None:
offset = cls.infer_field_offset(obj, field)
if offset == -1:
return
obj_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(obj)).value
new_value_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(new_value)).value
if obj_addr is None or new_value_addr is None:
return
if prev_value is not None:
ctypes.pythonapi.Py_DecRef(ctypes.py_object(prev_value))
if new_value is not None:
ctypes.pythonapi.Py_IncRef(ctypes.py_object(new_value))
ctypes.cast(
obj_addr + WORD_N_BYTES * offset, ctypes.POINTER(WORD_TYPE)
).contents.value = new_value_addr
@classmethod
def try_patch_readonly_attr(
cls,
old: object,
new: object,
field: str,
new_is_value: bool = False,
offset: int = -1,
) -> None:
old_value = getattr(old, field, NOT_FOUND)
new_value = new if new_is_value else getattr(new, field, NOT_FOUND)
if old_value is NOT_FOUND or new_value is NOT_FOUND:
return
elif old_value is new_value:
return
elif old_value is not None and offset < 0:
offset = cls.infer_field_offset(old, field)
elif offset < 0:
assert not new_is_value
assert new_value is not None
offset = cls.infer_field_offset(new, field)
cls.try_write_readonly_attr(old, field, new_value, offset=offset)
@classmethod
def try_patch_attr(
cls,
old: object,
new: object,
field: str,
new_is_value: bool = False,
offset: int = -1,
) -> None:
try:
setattr(old, field, new if new_is_value else getattr(new, field))
except (AttributeError, TypeError, ValueError):
cls.try_patch_readonly_attr(old, new, field, new_is_value, offset)
@classmethod
def patch_function(
cls, to_patch_to: Any, to_patch_from: Any, is_method: bool
) -> None:
new_freevars = []
new_closure = []
for i, v in enumerate(to_patch_to.__code__.co_freevars):
if v not in to_patch_from.__code__.co_freevars or v == "__class__":
new_freevars.append(v)
new_closure.append(to_patch_to.__closure__[i])
for i, v in enumerate(to_patch_from.__code__.co_freevars):
if v not in new_freevars:
new_freevars.append(v)
new_closure.append(to_patch_from.__closure__[i])
code_with_new_freevars = to_patch_from.__code__.replace(
co_freevars=tuple(new_freevars)
)
# lambdas may complain if there is more than one freevar
cls.try_patch_attr(
to_patch_to, code_with_new_freevars, "__code__", new_is_value=True
)
offset = -1
if to_patch_to.__closure__ is None and to_patch_from.__closure__ is not None:
offset = cls.infer_field_offset(to_patch_from, "__closure__")
cls.try_patch_readonly_attr(
to_patch_to,
tuple(new_closure) or None,
"__closure__",
new_is_value=True,
offset=offset,
)
for attr in ("__defaults__", "__kwdefaults__", "__doc__", "__dict__"):
cls.try_patch_attr(to_patch_to, to_patch_from, attr)
if is_method:
cls.try_patch_readonly_attr(to_patch_to, to_patch_from, "__self__")
|