UVIS / core /process.py
DurgaDeepak's picture
Update core/process.py
4b74d7b verified
import logging
import time
import timeout_decorator
import io
import os
import zipfile
import json
import cv2
import torch
import numpy as np
from PIL import Image
from registry import get_model
from core.describe_scene import describe_scene
from utils.helpers import generate_session_id, log_runtime
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Model mappings
DETECTION_MODEL_MAP = {
"YOLOv8-Nano": "yolov8n",
"YOLOv8-Small": "yolov8s",
"YOLOv8-Large": "yolov8l",
"YOLOv11-Beta": "yolov11b"
}
SEGMENTATION_MODEL_MAP = {
"SegFormer-B0": "segformer_b0",
"SegFormer-B5": "segformer_b5",
"DeepLabV3-ResNet50": "deeplabv3_resnet50"
}
DEPTH_MODEL_MAP = {
"MiDaS v21 Small 256": "midas_v21_small_256",
"MiDaS v21 384": "midas_v21_384",
"DPT Hybrid 384": "dpt_hybrid_384",
"DPT Swin2 Large 384": "dpt_swin2_large_384",
"DPT Beit Large 512": "dpt_beit_large_512"
}
#@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image
def process_image(
image: Image.Image,
run_det: bool,
det_model: str,
det_confidence: float,
run_seg: bool,
seg_model: str,
run_depth: bool,
depth_model: str,
blend: float
):
"""
Runs selected perception tasks on the input image and packages results.
Args:
image (PIL.Image): Input image.
run_det (bool): Run object detection.
det_model (str): Detection model key.
det_confidence (float): Detection confidence threshold.
run_seg (bool): Run segmentation.
seg_model (str): Segmentation model key.
run_depth (bool): Run depth estimation.
depth_model (str): Depth model key.
blend (float): Overlay blend alpha (0.0 - 1.0).
Returns:
Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP.
"""
logger.info("Starting image processing pipeline.")
start_time = time.time()
outputs, scene = {}, {}
combined_np = np.array(image)
try:
# Detection
if run_det:
logger.info(f"Running detection with model: {det_model}")
load_start = time.time()
model = get_model("detection", det_model, device="cpu")
model.load_model()
logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.")
boxes = model.predict(image, conf_threshold=det_confidence)
overlay = model.draw(image, boxes)
combined_np = np.array(overlay)
buf = io.BytesIO()
overlay.save(buf, format="PNG")
outputs["detection.png"] = buf.getvalue()
scene["detection"] = boxes
# Segmentation
if run_seg:
logger.info(f"Running segmentation with model: {seg_model}")
load_start = time.time()
model = get_model("segmentation", seg_model, device="cpu")
logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.")
mask = model.predict(image)
overlay = model.draw(image, mask, alpha=blend)
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0)
buf = io.BytesIO()
overlay.save(buf, format="PNG")
outputs["segmentation.png"] = buf.getvalue()
scene["segmentation"] = mask.tolist()
# Depth Estimation
if run_depth:
logger.info(f"Running depth estimation with model: {depth_model}")
load_start = time.time()
model = get_model("depth", depth_model, device="cpu")
logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.")
dmap = model.predict(image)
norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8)
d_pil = Image.fromarray(norm_dmap)
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0)
buf = io.BytesIO()
d_pil.save(buf, format="PNG")
outputs["depth_map.png"] = buf.getvalue()
scene["depth"] = dmap.tolist()
# Final image overlay
final_img = Image.fromarray(combined_np)
buf = io.BytesIO()
final_img.save(buf, format="PNG")
outputs["scene_blueprint.png"] = buf.getvalue()
# Scene description
try:
scene_json = describe_scene(**scene)
except Exception as e:
logger.warning(f"describe_scene failed: {e}")
scene_json = {"error": str(e)}
telemetry = {
"session_id": generate_session_id(),
"runtime_sec": round(log_runtime(start_time), 2),
"used_models": {
"detection": det_model if run_det else None,
"segmentation": seg_model if run_seg else None,
"depth": depth_model if run_depth else None
}
}
scene_json["telemetry"] = telemetry
outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8")
# ZIP file creation
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w") as zipf:
for name, data in outputs.items():
zipf.writestr(name, data)
elapsed = log_runtime(start_time)
logger.info(f"Image processing completed in {elapsed:.2f} seconds.")
#return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue())
# Save ZIP to disk for Gradio file output
zip_path = "outputs/uvis_results.zip"
os.makedirs("outputs", exist_ok=True)
with open(zip_path, "wb") as f:
f.write(zip_buf.getvalue())
return final_img, scene_json, zip_path
except Exception as e:
logger.error(f"Error in processing pipeline: {e}")
return None, {"error": str(e)}, None