# image.py """ Processing logic for shoplifting detection (single-image version). Exposes process_image(...) generator which yields updates while processing. Each yielded update is a dict with keys: - status: human string - live_frame: path to latest suspicious frame image (or "") - suspicious_list: list of saved suspicious image paths so far - csv_path: path to CSV log (updated) - annotated_image: path to annotated image (only on final yield) """ import os import time import csv import json import base64 import cv2 from ultralytics import YOLO import logging # basic logger for these modules logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Optional OpenRouter client - only used if key provided. try: from openai import OpenAI except Exception: OpenAI = None def _ensure_dir(p): os.makedirs(p, exist_ok=True) def send_email_with_attachment(smtp_cfg, subject, html_body, attachment_path=None, attachment_bytes=None, attachment_filename=None): """ smtp_cfg: dict with smtp_server, smtp_port, email_user, email_pass, email_to, enabled Either attachment_path (str) or attachment_bytes (bytes) can be provided. If attachment_bytes provided and attachment_filename None -> uses 'attachment.jpg' as filename. Returns (success: bool, timestamp_str or error_str) """ if not smtp_cfg or not smtp_cfg.get("enabled"): return False, "smtp_disabled" try: import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from email.utils import formatdate import mimetypes smtp_server = smtp_cfg.get("smtp_server") smtp_port = int(smtp_cfg.get("smtp_port", 587)) email_user = smtp_cfg.get("email_user") email_pass = smtp_cfg.get("email_pass") email_to = smtp_cfg.get("email_to") if not (smtp_server and smtp_port and email_user and email_pass and email_to): logger.warning("Incomplete SMTP config — cannot send email.") return False, "incomplete_smtp_cfg" msg = MIMEMultipart() msg["From"] = email_user msg["To"] = email_to msg["Date"] = formatdate(localtime=True) msg["Subject"] = subject msg.attach(MIMEText(html_body, "html")) # prepare attachment if attachment_bytes is not None: filename = attachment_filename if attachment_filename else "attachment.jpg" ctype, encoding = mimetypes.guess_type(filename) maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) part = MIMEBase(maintype, subtype) part.set_payload(attachment_bytes) encoders.encode_base64(part) part.add_header("Content-Disposition", f'attachment; filename="{filename}"') msg.attach(part) elif attachment_path is not None: if not os.path.exists(attachment_path): logger.warning(f"Attachment path does not exist: {attachment_path}") else: with open(attachment_path, "rb") as f: file_bytes = f.read() filename = attachment_filename if attachment_filename else os.path.basename(attachment_path) ctype, encoding = mimetypes.guess_type(filename) maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) part = MIMEBase(maintype, subtype) part.set_payload(file_bytes) encoders.encode_base64(part) part.add_header("Content-Disposition", f'attachment; filename="{filename}"') msg.attach(part) # connect and send if smtp_port == 465: server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=20) else: server = smtplib.SMTP(smtp_server, smtp_port, timeout=20) server.ehlo() try: server.starttls() except Exception as e: logger.warning(f"starttls failed or not supported: {e}") server.login(email_user, email_pass) server.sendmail(email_user, [email_to], msg.as_string()) server.quit() ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return True, ts except Exception as e: logger.exception("[EMAIL ERROR]") return False, str(e) def analyze_with_gemini_if_enabled(openrouter_cfg, image_path, frame_idx, bbox, conf_val, out_dir, log_csv, source_path, smtp_cfg): """ Same function as in video.py (adapted call param name source_path instead of video_path). Writes a single CSV row inside this function. CSV columns (order): id, timestamp, video_path, frame_idx, label, model_conf, bbox, x1, y1, x2, y2, image_path, crop_path, email_sent, email_timestamp, notes """ api_summary = "" try: # if API disabled --> write a skipped row with minimal fields if not openrouter_cfg or not openrouter_cfg.get("api_key") or OpenAI is None: ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) uid = f"frame_{frame_idx:06d}" bbox_str = "" x1 = y1 = x2 = y2 = "" row = [ uid, ts, source_path, frame_idx, "shoplifting", f"{conf_val:.3f}" if conf_val is not None else "", bbox_str, x1, y1, x2, y2, image_path, "", # crop_path empty "False", "", # email_sent, email_timestamp "api_skipped" # notes ] with open(log_csv, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(row) return {"skipped": True} # --- call OpenAI/OpenRouter --- client = OpenAI(base_url=openrouter_cfg.get("base_url", "https://openrouter.ai/api/v1"), api_key=openrouter_cfg["api_key"]) with open(image_path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") prompt_text = """ You are a visual analyst. Examine the image and answer with a concise JSON object. Return EXACT JSON keys: theft, reason, suspect_description. If theft detected use "Yes" else "No". """ messages = [ {"role": "user", "content": [ {"type": "text", "text": prompt_text}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} ]} ] completion = client.chat.completions.create( model=openrouter_cfg.get("model_name"), messages=messages ) observation = completion.choices[0].message.content.strip() api_summary = observation.replace("\n", " ") api_json = {} try: start = observation.find("{") end = observation.rfind("}") + 1 if start != -1 and end != -1: api_json = json.loads(observation[start:end]) else: api_json = {"raw": observation} except Exception: api_json = {"raw": observation} api_theft = api_json.get("theft", "") if isinstance(api_json, dict) else "" api_reason = api_json.get("reason", "") if isinstance(api_json, dict) else "" api_suspect = api_json.get("suspect_description", "") if isinstance(api_json, dict) else "" crop_path = "" email_sent = False email_ts = "" # if theft yes -> crop and maybe send email if api_theft and str(api_theft).lower() == "yes": if bbox: x1i, y1i, x2i, y2i = bbox # read the frame bytes (if image_path is bytes path or actual path) frame = None try: frame = cv2.imread(image_path) except Exception: frame = None if frame is not None and x2i > x1i and y2i > y1i: crop = frame[y1i:y2i, x1i:x2i] ok, crop_buf = cv2.imencode('.jpg', crop) if ok: crop_bytes = crop_buf.tobytes() # optionally save a copy on disk (non-mandatory); keep original behavior but guarded try: crop_path = os.path.join(out_dir, f"crop_{frame_idx}.jpg") with open(crop_path, "wb") as cf: cf.write(crop_bytes) except Exception: # ignore write failure; we will still attempt to email the bytes crop_path = "" details_html = "" html_body = f"""

⚠ Alert - Suspicious activity detected

{details_html}

Frame index: {frame_idx} | Confidence: {conf_val:.3f}

""" if smtp_cfg and smtp_cfg.get("enabled"): success, info = send_email_with_attachment( smtp_cfg, "Shoplifting Alert", html_body, attachment_bytes=crop_bytes, attachment_filename=f"crop_{frame_idx}.jpg" ) if success: email_sent = True email_ts = info # timestamp # prepare bbox strings / coords for CSV ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) uid = f"frame_{frame_idx:06d}" bbox_str = "" x1 = y1 = x2 = y2 = "" if bbox: try: x1_val, y1_val, x2_val, y2_val = bbox x1 = int(x1_val); y1 = int(y1_val); x2 = int(x2_val); y2 = int(y2_val) bbox_str = f"{x1},{y1},{x2},{y2}" except Exception: bbox_str = "" x1 = y1 = x2 = y2 = "" # notes: keep compact API summary or theft/reason short note notes = "" if api_summary: notes = api_summary else: # fallback short note if api_theft: notes = f"theft:{api_theft} reason:{api_reason}" else: notes = "" row = [ uid, ts, source_path, frame_idx, "shoplifting", f"{conf_val:.3f}" if conf_val is not None else "", bbox_str, x1, y1, x2, y2, image_path, crop_path, str(email_sent), email_ts, notes ] with open(log_csv, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(row) return api_json except Exception as e: print("[API ERROR]", e) ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) uid = f"frame_{frame_idx:06d}" bbox_str = "" x1 = y1 = x2 = y2 = "" try: if bbox: x1v, y1v, x2v, y2v = bbox x1 = x1v; y1 = y1v; x2 = x2v; y2 = y2v bbox_str = f"{x1},{y1},{x2},{y2}" except: pass row = [ uid, ts, source_path, frame_idx, "shoplifting", f"{conf_val:.3f}" if conf_val is not None else "", bbox_str, x1, y1, x2, y2, image_path, "", # no crop "False", "", # email_sent, email_timestamp f"api_error: {e}" ] try: with open(log_csv, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(row) except Exception as e2: print("[CSV WRITE ERROR]", e2) return {"error": str(e)} def process_image( image_path, model_path, out_root, openrouter_cfg=None, smtp_cfg=None, conf_thresh=0.5, confirm_conf_thresh=0.7 ): """ Generator that yields updates during processing of a single image. Yields dicts with keys: status, live_frame, suspicious_list, csv_path, annotated_image (only final) """ _ensure_dir(out_root) suspicious_dir = os.path.join(out_root, "suspicious_images") _ensure_dir(suspicious_dir) csv_path = os.path.join(suspicious_dir, "suspicious_log.csv") CSV_HEADER = [ "id", "timestamp", "video_path", "frame_idx", "label", "model_conf", "bbox", "x1", "y1", "x2", "y2", "image_path", "crop_path", "email_sent", "email_timestamp", "notes" ] # create CSV header with open(csv_path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(CSV_HEADER) # load model yield {"status": "Loading model...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} model = YOLO(model_path) # pick device device = "cpu" try: import torch device = 0 if torch.cuda.is_available() else "cpu" except Exception: device = "cpu" # read image if not os.path.exists(image_path): yield {"status": f"ERROR: Cannot open image {image_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} return frame_bgr = cv2.imread(image_path) if frame_bgr is None: yield {"status": f"ERROR: Failed to read image {image_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} return frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).copy() yield {"status": "Starting inference on image...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} # run prediction (non-stream) results = model.predict(source=image_path, device=device, conf=conf_thresh) annotated = frame_bgr.copy() suspicious_saved = [] last_live = "" detection_idx = 0 found_any = False # results is a list (one element per image) for r in results: boxes, confs, cls_idx = [], [], [] if hasattr(r, "boxes") and r.boxes is not None: try: boxes = r.boxes.xyxy.cpu().numpy().tolist() confs = r.boxes.conf.cpu().numpy().tolist() cls_idx = r.boxes.cls.cpu().numpy().astype(int).tolist() except Exception: try: boxes = r.boxes.xyxy.tolist() confs = r.boxes.conf.tolist() cls_idx = [int(x) for x in getattr(r.boxes, "cls", [])] if getattr(r.boxes, "cls", None) is not None else [] except Exception: boxes, confs, cls_idx = [], [], [] highest_conf = 0.0 chosen_bbox = None for i, box in enumerate(boxes): conf = confs[i] if i < len(confs) else 0.0 cid = cls_idx[i] if i < len(cls_idx) else None label = str(cid) if cid is not None else "obj" try: if isinstance(model.names, dict): label = model.names.get(cid, str(cid)) else: label = model.names[cid] except Exception: pass if conf < conf_thresh: continue x1, y1, x2, y2 = [int(round(float(v))) for v in box[:4]] color = (0, 255, 0) if label.strip().lower() == "shoplifting": color = (0, 0, 255) if conf >= confirm_conf_thresh and conf > highest_conf: highest_conf = conf chosen_bbox = (x1, y1, x2, y2) # draw on annotated image cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2) label_text = f"{label} {conf:.2f}" cv2.putText(annotated, label_text, (x1, max(15, y1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA) # if this detection qualifies as suspicious (confirmed) if label.strip().lower() == "shoplifting" and conf >= confirm_conf_thresh: found_any = True saved_name = f"img_{detection_idx:03d}_{int(time.time())}.jpg" saved_path = os.path.join(suspicious_dir, saved_name) # save the full annotated frame as suspicious saved (keeps parity with video flow) cv2.imwrite(saved_path, annotated) suspicious_saved.append(saved_path) last_live = saved_path # call API & maybe send email (synchronously) analyze_with_gemini_if_enabled(openrouter_cfg, saved_path, detection_idx, (x1, y1, x2, y2), conf, suspicious_dir, csv_path, image_path, smtp_cfg) # yield an update for this detection yield {"status": f"Suspicious detection saved: {os.path.basename(saved_path)} (det {detection_idx})", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path} detection_idx += 1 # end boxes loop # write final annotated image annotated_image_path = os.path.join(out_root, f"annotated_{int(time.time())}.jpg") cv2.imwrite(annotated_image_path, annotated) # if no suspicious detections found, still write a CSV row to note scan (optional) if not found_any: # write a "no suspicious" row to CSV for traceability ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) uid = "no_suspicious_000000" row = [ uid, ts, image_path, 0, "none", "", "", "", "", "", "", image_path, "", # no crop "False", "", # email_sent, email_timestamp "no_suspicious_detections" ] with open(csv_path, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(row) yield {"status": "No suspicious detections found.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path} # final yield with annotated image path yield {"status": "Finished processing image.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path, "annotated_image": annotated_image_path}