gradio-Ui / video.py
Nour190's picture
video.py
444b501 verified
# video.py
"""
Processing logic for shoplifting detection (streaming version).
Exposes process_video_stream(...) generator which yields updates while processing.
Each yielded update is a dict with keys:
- status: human string
- live_frame: path to latest suspicious frame image (or "")
- suspicious_list: list of saved suspicious image paths so far
- csv_path: path to CSV log (updated)
- annotated_video: path to final annotated video (only on final yield)
"""
import os
import time
import csv
import json
import base64
import cv2
from ultralytics import YOLO
import logging
# basic logger for these modules
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Optional OpenRouter client - only used if key provided.
try:
from openai import OpenAI
except Exception:
OpenAI = None
def _ensure_dir(p):
os.makedirs(p, exist_ok=True)
def send_email_with_attachment(smtp_cfg, subject, html_body, attachment_path=None, attachment_bytes=None, attachment_filename=None):
"""
smtp_cfg: dict with smtp_server, smtp_port, email_user, email_pass, email_to, enabled
Either attachment_path (str) or attachment_bytes (bytes) can be provided.
If attachment_bytes provided and attachment_filename None -> uses 'attachment.jpg' as filename.
Returns (success: bool, timestamp_str or error_str)
"""
if not smtp_cfg or not smtp_cfg.get("enabled"):
return False, "smtp_disabled"
try:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formatdate
import mimetypes
smtp_server = smtp_cfg.get("smtp_server")
smtp_port = int(smtp_cfg.get("smtp_port", 587))
email_user = smtp_cfg.get("email_user")
email_pass = smtp_cfg.get("email_pass")
email_to = smtp_cfg.get("email_to")
if not (smtp_server and smtp_port and email_user and email_pass and email_to):
logger.warning("Incomplete SMTP config — cannot send email.")
return False, "incomplete_smtp_cfg"
msg = MIMEMultipart()
msg["From"] = email_user
msg["To"] = email_to
msg["Date"] = formatdate(localtime=True)
msg["Subject"] = subject
msg.attach(MIMEText(html_body, "html"))
# prepare attachment
if attachment_bytes is not None:
filename = attachment_filename if attachment_filename else "attachment.jpg"
ctype, encoding = mimetypes.guess_type(filename)
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream"))
part = MIMEBase(maintype, subtype)
part.set_payload(attachment_bytes)
encoders.encode_base64(part)
part.add_header("Content-Disposition", f'attachment; filename="{filename}"')
msg.attach(part)
elif attachment_path is not None:
if not os.path.exists(attachment_path):
logger.warning(f"Attachment path does not exist: {attachment_path}")
else:
with open(attachment_path, "rb") as f:
file_bytes = f.read()
filename = attachment_filename if attachment_filename else os.path.basename(attachment_path)
ctype, encoding = mimetypes.guess_type(filename)
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream"))
part = MIMEBase(maintype, subtype)
part.set_payload(file_bytes)
encoders.encode_base64(part)
part.add_header("Content-Disposition", f'attachment; filename="{filename}"')
msg.attach(part)
# connect and send
if smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=20)
else:
server = smtplib.SMTP(smtp_server, smtp_port, timeout=20)
server.ehlo()
try:
server.starttls()
except Exception as e:
logger.warning(f"starttls failed or not supported: {e}")
server.login(email_user, email_pass)
server.sendmail(email_user, [email_to], msg.as_string())
server.quit()
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
return True, ts
except Exception as e:
logger.exception("[EMAIL ERROR]")
return False, str(e)
def analyze_with_gemini_if_enabled(openrouter_cfg, image_path, frame_idx, bbox, conf_val, out_dir, log_csv, video_path, smtp_cfg):
"""
Minimal API call wrapper; returns parsed api_json or dict with error/skipped.
Writes a single CSV row inside this function (keeps CSV consistent).
CSV columns (order): id, timestamp, video_path, frame_idx, label, model_conf,
bbox, x1, y1, x2, y2, image_path, crop_path,
email_sent, email_timestamp, notes
"""
api_summary = ""
try:
# if API disabled --> write a skipped row with minimal fields
if not openrouter_cfg or not openrouter_cfg.get("api_key") or OpenAI is None:
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
uid = f"frame_{frame_idx:06d}"
bbox_str = ""
x1 = y1 = x2 = y2 = ""
row = [
uid, ts, video_path, frame_idx, "shoplifting",
f"{conf_val:.3f}" if conf_val is not None else "",
bbox_str, x1, y1, x2, y2,
image_path, "", # crop_path empty
"False", "", # email_sent, email_timestamp
"api_skipped" # notes
]
with open(log_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(row)
return {"skipped": True}
# --- call OpenAI/OpenRouter ---
client = OpenAI(base_url=openrouter_cfg.get("base_url", "https://openrouter.ai/api/v1"),
api_key=openrouter_cfg["api_key"])
with open(image_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
prompt_text = """
You are a visual analyst. Examine the image and answer with a concise JSON object.
Return EXACT JSON keys: theft, reason, suspect_description.
If theft detected use "Yes" else "No".
"""
messages = [
{"role": "user",
"content": [
{"type": "text", "text": prompt_text},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
]}
]
completion = client.chat.completions.create(
model=openrouter_cfg.get("model_name"),
messages=messages
)
observation = completion.choices[0].message.content.strip()
api_summary = observation.replace("\n", " ")
api_json = {}
try:
start = observation.find("{")
end = observation.rfind("}") + 1
if start != -1 and end != -1:
api_json = json.loads(observation[start:end])
else:
api_json = {"raw": observation}
except Exception:
api_json = {"raw": observation}
api_theft = api_json.get("theft", "") if isinstance(api_json, dict) else ""
api_reason = api_json.get("reason", "") if isinstance(api_json, dict) else ""
api_suspect = api_json.get("suspect_description", "") if isinstance(api_json, dict) else ""
crop_path = ""
email_sent = False
email_ts = ""
# if theft yes -> crop and maybe send email
if api_theft and str(api_theft).lower() == "yes":
if bbox:
x1i, y1i, x2i, y2i = bbox
# read the frame bytes (if image_path is bytes path or actual path)
frame = None
try:
frame = cv2.imread(image_path)
except Exception:
frame = None
if frame is not None and x2i > x1i and y2i > y1i:
crop = frame[y1i:y2i, x1i:x2i]
ok, crop_buf = cv2.imencode('.jpg', crop)
if ok:
crop_bytes = crop_buf.tobytes()
# optionally save a copy on disk (non-mandatory); keep original behavior but guarded
try:
crop_path = os.path.join(out_dir, f"crop_{frame_idx}.jpg")
with open(crop_path, "wb") as cf:
cf.write(crop_bytes)
except Exception:
# ignore write failure; we will still attempt to email the bytes
crop_path = ""
details_html = "<ul>"
if api_reason:
details_html += f"<li><b>Reason:</b> {api_reason}</li>"
if api_suspect:
details_html += f"<li><b>Suspect:</b> {api_suspect}</li>"
details_html += "</ul>"
html_body = f"""
<html><body>
<h3>⚠ Alert - Suspicious activity detected</h3>
{details_html}
<p>Frame index: {frame_idx} | Confidence: {conf_val:.3f}</p>
</body></html>
"""
if smtp_cfg and smtp_cfg.get("enabled"):
success, info = send_email_with_attachment(
smtp_cfg,
"Shoplifting Alert",
html_body,
attachment_bytes=crop_bytes,
attachment_filename=f"crop_{frame_idx}.jpg"
)
if success:
email_sent = True
email_ts = info # timestamp
# prepare bbox strings / coords for CSV
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
uid = f"frame_{frame_idx:06d}"
bbox_str = ""
x1 = y1 = x2 = y2 = ""
if bbox:
try:
x1_val, y1_val, x2_val, y2_val = bbox
x1 = int(x1_val); y1 = int(y1_val); x2 = int(x2_val); y2 = int(y2_val)
bbox_str = f"{x1},{y1},{x2},{y2}"
except Exception:
bbox_str = ""
x1 = y1 = x2 = y2 = ""
# notes: keep compact API summary or theft/reason short note
notes = ""
if api_summary:
notes = api_summary
else:
# fallback short note
if api_theft:
notes = f"theft:{api_theft} reason:{api_reason}"
else:
notes = ""
row = [
uid, ts, video_path, frame_idx, "shoplifting",
f"{conf_val:.3f}" if conf_val is not None else "",
bbox_str, x1, y1, x2, y2,
image_path, crop_path,
str(email_sent), email_ts,
notes
]
with open(log_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(row)
return api_json
except Exception as e:
print("[API ERROR]", e)
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
uid = f"frame_{frame_idx:06d}"
bbox_str = ""
x1 = y1 = x2 = y2 = ""
try:
if bbox:
x1v, y1v, x2v, y2v = bbox
x1 = x1v; y1 = y1v; x2 = x2v; y2 = y2v
bbox_str = f"{x1},{y1},{x2},{y2}"
except:
pass
row = [
uid, ts, video_path, frame_idx, "shoplifting",
f"{conf_val:.3f}" if conf_val is not None else "",
bbox_str, x1, y1, x2, y2,
image_path, "", # no crop
"False", "", # email_sent, email_timestamp
f"api_error: {e}"
]
try:
with open(log_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(row)
except Exception as e2:
print("[CSV WRITE ERROR]", e2)
return {"error": str(e)}
def process_video_stream(
video_path,
model_path,
out_root,
openrouter_cfg=None,
smtp_cfg=None,
conf_thresh=0.5,
confirm_conf_thresh=0.7,
send_interval=4.0,
confirmed_block_seconds=1000.0,
progress_interval_frames=50
):
"""
Generator that yields updates during processing.
Yields dicts with keys: status, live_frame, suspicious_list, csv_path, annotated_video (only final)
"""
_ensure_dir(out_root)
suspicious_dir = os.path.join(out_root, "suspicious_frames")
_ensure_dir(suspicious_dir)
csv_path = os.path.join(suspicious_dir, "suspicious_log.csv")
CSV_HEADER = [
"id", "timestamp", "video_path", "frame_idx", "label", "model_conf",
"bbox", "x1", "y1", "x2", "y2", "image_path", "crop_path",
"email_sent", "email_timestamp", "notes"
]
with open(csv_path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(CSV_HEADER)
# load model
yield {"status": "Loading model...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path}
model = YOLO(model_path)
# pick device
device = "cpu"
try:
import torch
device = 0 if torch.cuda.is_available() else "cpu"
except Exception:
device = "cpu"
# prepare video read/write
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
yield {"status": f"ERROR: Cannot open video {video_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path}
return
fps = cap.get(cv2.CAP_PROP_FPS) or 25
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
annotated_video_path = os.path.join(out_root, f"annotated_{int(time.time())}.mp4")
writer = cv2.VideoWriter(annotated_video_path, fourcc, int(fps), (frame_w, frame_h))
last_sent_time = 0.0
extra_block_until = 0.0
frame_idx = 0
suspicious_saved = []
last_live = ""
yield {"status": "Starting inference...", "live_frame": "", "suspicious_list": suspicious_saved, "csv_path": csv_path}
results_gen = model.predict(source=video_path, stream=True, device=device, conf=conf_thresh)
for r in results_gen:
frame_rgb = getattr(r, "orig_img", None)
if frame_rgb is None:
frame_idx += 1
continue
frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR).copy()
boxes, confs, cls_idx = [], [], []
if hasattr(r, "boxes") and r.boxes is not None:
try:
boxes = r.boxes.xyxy.cpu().numpy().tolist()
confs = r.boxes.conf.cpu().numpy().tolist()
cls_idx = r.boxes.cls.cpu().numpy().astype(int).tolist()
except Exception:
try:
boxes = r.boxes.xyxy.tolist()
confs = r.boxes.conf.tolist()
cls_idx = [int(x) for x in getattr(r.boxes, "cls", [])] if getattr(r.boxes, "cls", None) is not None else []
except Exception:
boxes, confs, cls_idx = [], [], []
found_high_conf = False
highest_conf = 0.0
chosen_bbox = None
for i, box in enumerate(boxes):
conf = confs[i] if i < len(confs) else 0.0
cid = cls_idx[i] if i < len(cls_idx) else None
label = str(cid) if cid is not None else "obj"
try:
if isinstance(model.names, dict):
label = model.names.get(cid, str(cid))
else:
label = model.names[cid]
except Exception:
pass
if conf < conf_thresh:
continue
x1, y1, x2, y2 = [int(round(float(v))) for v in box[:4]]
if label.strip().lower() == "shoplifting":
color = (0, 0, 255)
if conf >= confirm_conf_thresh and conf > highest_conf:
found_high_conf = True
highest_conf = conf
chosen_bbox = (x1, y1, x2, y2)
else:
color = (0, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label_text = f"{label} {conf:.2f}"
cv2.putText(frame, label_text, (x1, max(15, y1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA)
now = time.time()
if found_high_conf:
if now - last_sent_time >= send_interval and now >= extra_block_until:
saved_name = f"frame_{frame_idx:06d}_{int(time.time())}.jpg"
saved_path = os.path.join(suspicious_dir, saved_name)
cv2.imwrite(saved_path, frame)
suspicious_saved.append(saved_path)
last_live = saved_path
# call API & maybe send email (synchronously)
analyze_with_gemini_if_enabled(openrouter_cfg, saved_path, frame_idx, chosen_bbox, highest_conf,
suspicious_dir, csv_path, video_path, smtp_cfg)
last_sent_time = now
extra_block_until = now + confirmed_block_seconds
# yield an update immediately with the live frame and updated gallery + csv_path
yield {"status": f"Suspicious frame saved: {os.path.basename(saved_path)} (frame {frame_idx})",
"live_frame": last_live,
"suspicious_list": suspicious_saved.copy(),
"csv_path": csv_path}
writer.write(frame)
frame_idx += 1
# periodic progress update
if frame_idx % progress_interval_frames == 0:
yield {"status": f"Processed {frame_idx} frames...", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path}
writer.release()
cap.release()
# final yield with annotated video path
yield {"status": "Finished processing.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path, "annotated_video": annotated_video_path}