Spaces:
Running
on
Zero
Running
on
Zero
import logging | |
import time | |
import timeout_decorator | |
import io | |
import os | |
import zipfile | |
import json | |
import cv2 | |
import torch | |
import numpy as np | |
from PIL import Image | |
from registry import get_model | |
from core.describe_scene import describe_scene | |
from utils.helpers import generate_session_id, log_runtime | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Model mappings | |
DETECTION_MODEL_MAP = { | |
"YOLOv8-Nano": "yolov8n", | |
"YOLOv8-Small": "yolov8s", | |
"YOLOv8-Large": "yolov8l", | |
"YOLOv11-Beta": "yolov11b" | |
} | |
SEGMENTATION_MODEL_MAP = { | |
"SegFormer-B0": "segformer_b0", | |
"SegFormer-B5": "segformer_b5", | |
"DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
} | |
DEPTH_MODEL_MAP = { | |
"MiDaS v21 Small 256": "midas_v21_small_256", | |
"MiDaS v21 384": "midas_v21_384", | |
"DPT Hybrid 384": "dpt_hybrid_384", | |
"DPT Swin2 Large 384": "dpt_swin2_large_384", | |
"DPT Beit Large 512": "dpt_beit_large_512" | |
} | |
#@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image | |
def process_image( | |
image: Image.Image, | |
run_det: bool, | |
det_model: str, | |
det_confidence: float, | |
run_seg: bool, | |
seg_model: str, | |
run_depth: bool, | |
depth_model: str, | |
blend: float | |
): | |
""" | |
Runs selected perception tasks on the input image and packages results. | |
Args: | |
image (PIL.Image): Input image. | |
run_det (bool): Run object detection. | |
det_model (str): Detection model key. | |
det_confidence (float): Detection confidence threshold. | |
run_seg (bool): Run segmentation. | |
seg_model (str): Segmentation model key. | |
run_depth (bool): Run depth estimation. | |
depth_model (str): Depth model key. | |
blend (float): Overlay blend alpha (0.0 - 1.0). | |
Returns: | |
Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. | |
""" | |
logger.info("Starting image processing pipeline.") | |
start_time = time.time() | |
outputs, scene = {}, {} | |
combined_np = np.array(image) | |
try: | |
# Detection | |
if run_det: | |
logger.info(f"Running detection with model: {det_model}") | |
load_start = time.time() | |
model = get_model("detection", det_model, device="cpu") | |
model.load_model() | |
logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") | |
boxes = model.predict(image, conf_threshold=det_confidence) | |
overlay = model.draw(image, boxes) | |
combined_np = np.array(overlay) | |
buf = io.BytesIO() | |
overlay.save(buf, format="PNG") | |
outputs["detection.png"] = buf.getvalue() | |
scene["detection"] = boxes | |
# Segmentation | |
if run_seg: | |
logger.info(f"Running segmentation with model: {seg_model}") | |
load_start = time.time() | |
model = get_model("segmentation", seg_model, device="cpu") | |
logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") | |
mask = model.predict(image) | |
overlay = model.draw(image, mask, alpha=blend) | |
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) | |
buf = io.BytesIO() | |
overlay.save(buf, format="PNG") | |
outputs["segmentation.png"] = buf.getvalue() | |
scene["segmentation"] = mask.tolist() | |
# Depth Estimation | |
if run_depth: | |
logger.info(f"Running depth estimation with model: {depth_model}") | |
load_start = time.time() | |
model = get_model("depth", depth_model, device="cpu") | |
logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") | |
dmap = model.predict(image) | |
norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) | |
d_pil = Image.fromarray(norm_dmap) | |
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) | |
buf = io.BytesIO() | |
d_pil.save(buf, format="PNG") | |
outputs["depth_map.png"] = buf.getvalue() | |
scene["depth"] = dmap.tolist() | |
# Final image overlay | |
final_img = Image.fromarray(combined_np) | |
buf = io.BytesIO() | |
final_img.save(buf, format="PNG") | |
outputs["scene_blueprint.png"] = buf.getvalue() | |
# Scene description | |
try: | |
scene_json = describe_scene(**scene) | |
except Exception as e: | |
logger.warning(f"describe_scene failed: {e}") | |
scene_json = {"error": str(e)} | |
telemetry = { | |
"session_id": generate_session_id(), | |
"runtime_sec": round(log_runtime(start_time), 2), | |
"used_models": { | |
"detection": det_model if run_det else None, | |
"segmentation": seg_model if run_seg else None, | |
"depth": depth_model if run_depth else None | |
} | |
} | |
scene_json["telemetry"] = telemetry | |
outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") | |
# ZIP file creation | |
zip_buf = io.BytesIO() | |
with zipfile.ZipFile(zip_buf, "w") as zipf: | |
for name, data in outputs.items(): | |
zipf.writestr(name, data) | |
elapsed = log_runtime(start_time) | |
logger.info(f"Image processing completed in {elapsed:.2f} seconds.") | |
#return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) | |
# Save ZIP to disk for Gradio file output | |
zip_path = "outputs/uvis_results.zip" | |
os.makedirs("outputs", exist_ok=True) | |
with open(zip_path, "wb") as f: | |
f.write(zip_buf.getvalue()) | |
return final_img, scene_json, zip_path | |
except Exception as e: | |
logger.error(f"Error in processing pipeline: {e}") | |
return None, {"error": str(e)}, None |