Spaces:
Sleeping
Sleeping
import os | |
import base64 | |
import json | |
import io | |
import datetime | |
from PIL import Image | |
import logging | |
from huggingface_hub import HfApi, CommitScheduler | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0" | |
LOCAL_LOG_DIR = "./hf_inference_logs" | |
# Custom JSON Encoder to handle numpy types | |
class NumpyEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.float32): | |
return float(obj) | |
return json.JSONEncoder.default(self, obj) | |
def _save_pil_image_to_file(image: Image.Image, directory: str, prefix: str) -> str: | |
"""Saves a PIL Image to a file and returns its filename.""" | |
if not isinstance(image, Image.Image): | |
raise TypeError(f"Expected a PIL Image, but received type: {type(image)}") | |
os.makedirs(directory, exist_ok=True) | |
timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") | |
filename = f"{prefix}_{timestamp_str}.png" | |
file_path = os.path.join(directory, filename) | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
image.save(file_path, format="PNG") | |
logger.info(f"Saved image to: {file_path}") | |
return filename | |
# The initialize_dataset function will change significantly or be removed/simplified | |
# as we are no longer appending to a datasets.Dataset object directly in memory | |
def initialize_dataset_repo(): | |
"""Initializes or ensures the Hugging Face dataset repository exists.""" | |
api = HfApi(token=os.getenv("HF_TOKEN")) | |
try: | |
api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset") | |
logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}") | |
except Exception: | |
logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}") | |
api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True) | |
return api # Return the API object for subsequent operations | |
def log_inference_data( | |
original_image: Image.Image, | |
inference_params: dict, | |
model_predictions: list[dict], | |
ensemble_output: dict, | |
forensic_images: list[Image.Image], | |
agent_monitoring_data: dict, | |
human_feedback: dict = None | |
): | |
"""Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository.""" | |
try: | |
api = initialize_dataset_repo() # Get or create the repository | |
original_image_filename = _save_pil_image_to_file(original_image, LOCAL_LOG_DIR, "original") | |
forensic_images_filenames = [] | |
for img_item in forensic_images: | |
if img_item is not None: | |
if not isinstance(img_item, Image.Image): | |
try: | |
img_item = Image.fromarray(img_item) | |
except Exception as e: | |
logger.error(f"Error converting forensic image to PIL for saving: {e}") | |
continue | |
forensic_images_filenames.append(_save_pil_image_to_file(img_item, LOCAL_LOG_DIR, "forensic")) | |
new_entry = { | |
"timestamp": datetime.datetime.now().isoformat(), | |
"image": original_image_filename, | |
"inference_request": inference_params, | |
"model_predictions": model_predictions, | |
"ensemble_output": ensemble_output, | |
"forensic_outputs": forensic_images_filenames, | |
"agent_monitoring_data": agent_monitoring_data, | |
"human_feedback": human_feedback if human_feedback is not None else {} | |
} | |
# Define a unique path for the new log file within the local directory | |
os.makedirs(LOCAL_LOG_DIR, exist_ok=True) # Ensure the local directory exists | |
timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") | |
log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json") | |
# Serialize the new entry to a JSON file using the custom encoder | |
with open(log_file_path, 'w', encoding='utf-8') as f: | |
json.dump(new_entry, f, cls=NumpyEncoder, indent=2) | |
# Schedule commit to Hugging Face dataset repository | |
scheduler = CommitScheduler( | |
repo_id=HF_DATASET_NAME, | |
repo_type="dataset", | |
folder_path=LOCAL_LOG_DIR, | |
path_in_repo="logs", | |
token=os.getenv("HF_TOKEN"), | |
every=10 # Commit every 10 files | |
) | |
with scheduler: | |
logger.info(f"Inference data logged successfully to local file: {log_file_path}") | |
except Exception as e: | |
logger.error(f"Failed to log inference data to local file: {e}") |