muni's picture
Add TinyTroupe
82a7a28
import json
import copy
from tinytroupe.utils import logger
class JsonSerializableRegistry:
"""
A mixin class that provides JSON serialization, deserialization, and subclass registration.
"""
class_mapping = {}
def to_json(self, include: list = None, suppress: list = None, file_path: str = None,
serialization_type_field_name = "json_serializable_class_name") -> dict:
"""
Returns a JSON representation of the object.
Args:
include (list, optional): Attributes to include in the serialization. Will override the default behavior.
suppress (list, optional): Attributes to suppress from the serialization. Will override the default behavior.
file_path (str, optional): Path to a file where the JSON will be written.
"""
# Gather all serializable attributes from the class hierarchy
serializable_attrs = set()
suppress_attrs = set()
for cls in self.__class__.__mro__: # Traverse the class hierarchy
if hasattr(cls, 'serializable_attributes') and isinstance(cls.serializable_attributes, list):
serializable_attrs.update(cls.serializable_attributes)
if hasattr(cls, 'suppress_attributes_from_serialization') and isinstance(cls.suppress_attributes_from_serialization, list):
suppress_attrs.update(cls.suppress_attributes_from_serialization)
# Override attributes with method parameters if provided
if include:
serializable_attrs = set(include)
if suppress:
suppress_attrs.update(suppress)
result = {serialization_type_field_name: self.__class__.__name__}
for attr in serializable_attrs if serializable_attrs else self.__dict__:
if attr not in suppress_attrs:
value = getattr(self, attr, None)
attr_renamed = self._programmatic_name_to_json_name(attr)
if isinstance(value, JsonSerializableRegistry):
result[attr_renamed] = value.to_json(serialization_type_field_name=serialization_type_field_name)
elif isinstance(value, list):
result[attr_renamed] = [item.to_json(serialization_type_field_name=serialization_type_field_name) if isinstance(item, JsonSerializableRegistry) else copy.deepcopy(item) for item in value]
elif isinstance(value, dict):
result[attr_renamed] = {k: v.to_json(serialization_type_field_name=serialization_type_field_name) if isinstance(v, JsonSerializableRegistry) else copy.deepcopy(v) for k, v in value.items()}
else:
result[attr_renamed] = copy.deepcopy(value)
if file_path:
# Create directories if they do not exist
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w') as f:
json.dump(result, f, indent=4)
return result
@classmethod
def from_json(cls, json_dict_or_path, suppress: list = None,
serialization_type_field_name = "json_serializable_class_name",
post_init_params: dict = None):
"""
Loads a JSON representation of the object and creates an instance of the class.
Args:
json_dict_or_path (dict or str): The JSON dictionary representing the object or a file path to load the JSON from.
suppress (list, optional): Attributes to suppress from being loaded.
Returns:
An instance of the class populated with the data from json_dict_or_path.
"""
if isinstance(json_dict_or_path, str):
with open(json_dict_or_path, 'r') as f:
json_dict = json.load(f)
else:
json_dict = json_dict_or_path
subclass_name = json_dict.get(serialization_type_field_name)
target_class = cls.class_mapping.get(subclass_name, cls)
instance = target_class.__new__(target_class) # Create an instance without calling __init__
# Gather all serializable attributes from the class hierarchy
serializable_attrs = set()
custom_serialization_initializers = {}
suppress_attrs = set(suppress) if suppress else set()
for target_mro in target_class.__mro__:
if hasattr(target_mro, 'serializable_attributes') and isinstance(target_mro.serializable_attributes, list):
serializable_attrs.update(target_mro.serializable_attributes)
if hasattr(target_mro, 'custom_serialization_initializers') and isinstance(target_mro.custom_serialization_initializers, dict):
custom_serialization_initializers.update(target_mro.custom_serialization_initializers)
if hasattr(target_mro, 'suppress_attributes_from_serialization') and isinstance(target_mro.suppress_attributes_from_serialization, list):
suppress_attrs.update(target_mro.suppress_attributes_from_serialization)
# Assign values only for serializable attributes if specified, otherwise assign everything
for key in serializable_attrs if serializable_attrs else json_dict:
key_in_json = cls._programmatic_name_to_json_name(key)
if key_in_json in json_dict and key not in suppress_attrs:
value = json_dict[key_in_json]
if key in custom_serialization_initializers:
# Use custom initializer if provided
setattr(instance, key, custom_serialization_initializers[key](value))
elif isinstance(value, dict) and serialization_type_field_name in value:
# Assume it's another JsonSerializableRegistry object
setattr(instance, key, JsonSerializableRegistry.from_json(value, serialization_type_field_name=serialization_type_field_name))
elif isinstance(value, list):
# Handle collections, recursively deserialize if items are JsonSerializableRegistry objects
deserialized_collection = []
for item in value:
if isinstance(item, dict) and serialization_type_field_name in item:
deserialized_collection.append(JsonSerializableRegistry.from_json(item, serialization_type_field_name=serialization_type_field_name))
else:
deserialized_collection.append(copy.deepcopy(item))
setattr(instance, key, deserialized_collection)
else:
setattr(instance, key, copy.deepcopy(value))
# Call post-deserialization initialization if available
if hasattr(instance, '_post_deserialization_init') and callable(instance._post_deserialization_init):
post_init_params = post_init_params if post_init_params else {}
instance._post_deserialization_init(**post_init_params)
return instance
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Register the subclass using its name as the key
JsonSerializableRegistry.class_mapping[cls.__name__] = cls
# Automatically extend serializable attributes and custom initializers from parent classes
if hasattr(cls, 'serializable_attributes') and isinstance(cls.serializable_attributes, list):
for base in cls.__bases__:
if hasattr(base, 'serializable_attributes') and isinstance(base.serializable_attributes, list):
cls.serializable_attributes = list(set(base.serializable_attributes + cls.serializable_attributes))
if hasattr(cls, 'suppress_attributes_from_serialization') and isinstance(cls.suppress_attributes_from_serialization, list):
for base in cls.__bases__:
if hasattr(base, 'suppress_attributes_from_serialization') and isinstance(base.suppress_attributes_from_serialization, list):
cls.suppress_attributes_from_serialization = list(set(base.suppress_attributes_from_serialization + cls.suppress_attributes_from_serialization))
if hasattr(cls, 'custom_serialization_initializers') and isinstance(cls.custom_serialization_initializers, dict):
for base in cls.__bases__:
if hasattr(base, 'custom_serialization_initializers') and isinstance(base.custom_serialization_initializers, dict):
base_initializers = base.custom_serialization_initializers.copy()
base_initializers.update(cls.custom_serialization_initializers)
cls.custom_serialization_initializers = base_initializers
def _post_deserialization_init(self, **kwargs):
# if there's a _post_init method, call it after deserialization
if hasattr(self, '_post_init'):
self._post_init(**kwargs)
@classmethod
def _programmatic_name_to_json_name(cls, name):
"""
Converts a programmatic name to a JSON name by converting it to snake case.
"""
if hasattr(cls, 'serializable_attributes_renaming') and isinstance(cls.serializable_attributes_renaming, dict):
return cls.serializable_attributes_renaming.get(name, name)
return name
@classmethod
def _json_name_to_programmatic_name(cls, name):
"""
Converts a JSON name to a programmatic name.
"""
if hasattr(cls, 'serializable_attributes_renaming') and isinstance(cls.serializable_attributes_renaming, dict):
reverse_rename = {}
for k, v in cls.serializable_attributes_renaming.items():
if v in reverse_rename:
raise ValueError(f"Duplicate value '{v}' found in serializable_attributes_renaming.")
reverse_rename[v] = k
return reverse_rename.get(name, name)
return name
def post_init(cls):
"""
Decorator to enforce a post-initialization method call in a class, if it has one.
The method must be named `_post_init`.
"""
original_init = cls.__init__
def new_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
if hasattr(cls, '_post_init'):
cls._post_init(self)
cls.__init__ = new_init
return cls
def merge_dicts(current, additions, overwrite=False, error_on_conflict=True):
"""
Merges two dictionaries and returns a new dictionary. Works as follows:
- If a key exists in the additions dictionary but not in the current dictionary, it is added.
- If a key maps to None in the current dictionary, it is replaced by the value in the additions dictionary.
- If a key exists in both dictionaries and the values are dictionaries, the function is called recursively.
- If a key exists in both dictionaries and the values are lists, the lists are concatenated and duplicates are removed.
- If the values are of different types, an exception is raised.
- If the values are of the same type but not both lists/dictionaries, the value from the additions dictionary overwrites the value in the current dictionary based on the overwrite parameter.
Parameters:
- current (dict): The original dictionary.
- additions (dict): The dictionary with values to add.
- overwrite (bool): Whether to overwrite values if they are of the same type but not both lists/dictionaries.
- error_on_conflict (bool): Whether to raise an error if there is a conflict and overwrite is False.
Returns:
- dict: A new dictionary with merged values.
"""
merged = current.copy() # Create a copy of the current dictionary to avoid altering it
for key in additions:
if key in merged:
# If the current value is None, directly assign the new value
if merged[key] is None:
merged[key] = additions[key]
# If both values are dictionaries, merge them recursively
elif isinstance(merged[key], dict) and isinstance(additions[key], dict):
merged[key] = merge_dicts(merged[key], additions[key], overwrite, error_on_conflict)
# If both values are lists, concatenate them and remove duplicates
elif isinstance(merged[key], list) and isinstance(additions[key], list):
merged[key].extend(additions[key])
# Remove duplicates while preserving order
merged[key] = remove_duplicates(merged[key])
# If the values are of different types, raise an exception
elif type(merged[key]) != type(additions[key]):
raise TypeError(f"Cannot merge different types: {type(merged[key])} and {type(additions[key])} for key '{key}'")
# If the values are of the same type but not both lists/dictionaries, decide based on the overwrite parameter
else:
if overwrite:
merged[key] = additions[key]
elif merged[key] != additions[key]:
if error_on_conflict:
raise ValueError(f"Conflict at key '{key}': overwrite is set to False and values are different.")
else:
continue # Ignore the conflict and continue
else:
# If the key is not present in merged, add it from additions
merged[key] = additions[key]
return merged
def remove_duplicates(lst):
"""
Removes duplicates from a list while preserving order.
Handles unhashable elements by using a list comprehension.
Parameters:
- lst (list): The list to remove duplicates from.
Returns:
- list: A new list with duplicates removed.
"""
seen = []
result = []
for item in lst:
if isinstance(item, dict):
# Convert dict to a frozenset of its items to make it hashable
item_key = frozenset(item.items())
else:
item_key = item
if item_key not in seen:
seen.append(item_key)
result.append(item)
return result