import logging import time import timeout_decorator import io import os import zipfile import json import cv2 import torch import numpy as np from PIL import Image from registry import get_model from core.describe_scene import describe_scene from utils.helpers import generate_session_id, log_runtime # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Model mappings DETECTION_MODEL_MAP = { "YOLOv8-Nano": "yolov8n", "YOLOv8-Small": "yolov8s", "YOLOv8-Large": "yolov8l", "YOLOv11-Beta": "yolov11b" } SEGMENTATION_MODEL_MAP = { "SegFormer-B0": "segformer_b0", "SegFormer-B5": "segformer_b5", "DeepLabV3-ResNet50": "deeplabv3_resnet50" } DEPTH_MODEL_MAP = { "MiDaS v21 Small 256": "midas_v21_small_256", "MiDaS v21 384": "midas_v21_384", "DPT Hybrid 384": "dpt_hybrid_384", "DPT Swin2 Large 384": "dpt_swin2_large_384", "DPT Beit Large 512": "dpt_beit_large_512" } def process_video( video_path: str, run_det: bool, det_model: str, det_confidence: float, run_seg: bool, seg_model: str, run_depth: bool, depth_model: str, blend: float ): """ Reads each frame from `video_path`, runs your existing `process_image()` on it, and writes out a new MP4 in outputs/processed_.mp4. Returns: (None, scene_json: dict, output_video_path: str) """ logger.info(f"Starting video processing for {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Cannot open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"mp4v") os.makedirs("outputs", exist_ok=True) base = os.path.basename(video_path) out_path = os.path.join("outputs", f"processed_{base}") writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) if not writer.isOpened(): cap.release() raise RuntimeError(f"Cannot write to: {out_path}") frame_idx = 0 scene_info = {"video": base, "frames_processed": 0} while True: ret, frame_bgr = cap.read() if not ret: break # BGR→RGB→PIL frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) pil_frame = Image.fromarray(frame_rgb) # Run your existing image pipeline try: processed_img, _, _ = process_image( image=pil_frame, run_det=run_det, det_model=det_model, det_confidence=det_confidence, run_seg=run_seg, seg_model=seg_model, run_depth=run_depth, depth_model=depth_model, blend=blend ) except Exception as e: cap.release() writer.release() raise RuntimeError(f"Error on frame {frame_idx}: {e}") # PIL→BGR numpy out_bgr = cv2.cvtColor(np.array(processed_img), cv2.COLOR_RGB2BGR) writer.write(out_bgr) frame_idx += 1 scene_info["frames_processed"] = frame_idx cap.release() writer.release() logger.info(f"Finished video. Wrote {frame_idx} frames to {out_path}") # Minimal JSON summary scene_json = { "video": scene_info["video"], "frames_processed": scene_info["frames_processed"] } return None, scene_json, out_path #@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image def process_image( image: Image.Image, run_det: bool, det_model: str, det_confidence: float, run_seg: bool, seg_model: str, run_depth: bool, depth_model: str, blend: float ): """ Runs selected perception tasks on the input image and packages results. Args: image (PIL.Image): Input image. run_det (bool): Run object detection. det_model (str): Detection model key. det_confidence (float): Detection confidence threshold. run_seg (bool): Run segmentation. seg_model (str): Segmentation model key. run_depth (bool): Run depth estimation. depth_model (str): Depth model key. blend (float): Overlay blend alpha (0.0 - 1.0). Returns: Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. """ logger.info("Starting image processing pipeline.") start_time = time.time() outputs, scene = {}, {} combined_np = np.array(image) try: # Detection if run_det: logger.info(f"Running detection with model: {det_model}") load_start = time.time() model = get_model("detection", det_model, device="cpu") model.load_model() logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") boxes = model.predict(image, conf_threshold=det_confidence) overlay = model.draw(image, boxes) combined_np = np.array(overlay) buf = io.BytesIO() overlay.save(buf, format="PNG") outputs["detection.png"] = buf.getvalue() scene["detection"] = boxes # Segmentation if run_seg: logger.info(f"Running segmentation with model: {seg_model}") load_start = time.time() model = get_model("segmentation", seg_model, device="cpu") logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") mask = model.predict(image) overlay = model.draw(image, mask, alpha=blend) combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) buf = io.BytesIO() overlay.save(buf, format="PNG") outputs["segmentation.png"] = buf.getvalue() scene["segmentation"] = mask.tolist() # Depth Estimation if run_depth: logger.info(f"Running depth estimation with model: {depth_model}") load_start = time.time() model = get_model("depth", depth_model, device="cpu") logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") dmap = model.predict(image) norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) d_pil = Image.fromarray(norm_dmap) combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) buf = io.BytesIO() d_pil.save(buf, format="PNG") outputs["depth_map.png"] = buf.getvalue() scene["depth"] = dmap.tolist() # Final image overlay final_img = Image.fromarray(combined_np) buf = io.BytesIO() final_img.save(buf, format="PNG") outputs["scene_blueprint.png"] = buf.getvalue() # Scene description try: scene_json = describe_scene(**scene) except Exception as e: logger.warning(f"describe_scene failed: {e}") scene_json = {"error": str(e)} telemetry = { "session_id": generate_session_id(), "runtime_sec": round(log_runtime(start_time), 2), "used_models": { "detection": det_model if run_det else None, "segmentation": seg_model if run_seg else None, "depth": depth_model if run_depth else None } } scene_json["telemetry"] = telemetry outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") # ZIP file creation zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zipf: for name, data in outputs.items(): zipf.writestr(name, data) elapsed = log_runtime(start_time) logger.info(f"Image processing completed in {elapsed:.2f} seconds.") #return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) # Save ZIP to disk for Gradio file output zip_path = "outputs/uvis_results.zip" os.makedirs("outputs", exist_ok=True) with open(zip_path, "wb") as f: f.write(zip_buf.getvalue()) return final_img, scene_json, zip_path except Exception as e: logger.error(f"Error in processing pipeline: {e}") return None, {"error": str(e)}, None