|
|
|
""" |
|
Processing logic for shoplifting detection (streaming version). |
|
Exposes process_video_stream(...) generator which yields updates while processing. |
|
Each yielded update is a dict with keys: |
|
- status: human string |
|
- live_frame: path to latest suspicious frame image (or "") |
|
- suspicious_list: list of saved suspicious image paths so far |
|
- csv_path: path to CSV log (updated) |
|
- annotated_video: path to final annotated video (only on final yield) |
|
""" |
|
|
|
import os |
|
import time |
|
import csv |
|
import json |
|
import base64 |
|
import cv2 |
|
from ultralytics import YOLO |
|
|
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
from openai import OpenAI |
|
except Exception: |
|
OpenAI = None |
|
|
|
def _ensure_dir(p): |
|
os.makedirs(p, exist_ok=True) |
|
|
|
def send_email_with_attachment(smtp_cfg, subject, html_body, attachment_path=None, attachment_bytes=None, attachment_filename=None): |
|
""" |
|
smtp_cfg: dict with smtp_server, smtp_port, email_user, email_pass, email_to, enabled |
|
Either attachment_path (str) or attachment_bytes (bytes) can be provided. |
|
If attachment_bytes provided and attachment_filename None -> uses 'attachment.jpg' as filename. |
|
Returns (success: bool, timestamp_str or error_str) |
|
""" |
|
if not smtp_cfg or not smtp_cfg.get("enabled"): |
|
return False, "smtp_disabled" |
|
|
|
try: |
|
import smtplib |
|
from email.mime.multipart import MIMEMultipart |
|
from email.mime.text import MIMEText |
|
from email.mime.base import MIMEBase |
|
from email import encoders |
|
from email.utils import formatdate |
|
import mimetypes |
|
|
|
smtp_server = smtp_cfg.get("smtp_server") |
|
smtp_port = int(smtp_cfg.get("smtp_port", 587)) |
|
email_user = smtp_cfg.get("email_user") |
|
email_pass = smtp_cfg.get("email_pass") |
|
email_to = smtp_cfg.get("email_to") |
|
|
|
if not (smtp_server and smtp_port and email_user and email_pass and email_to): |
|
logger.warning("Incomplete SMTP config — cannot send email.") |
|
return False, "incomplete_smtp_cfg" |
|
|
|
msg = MIMEMultipart() |
|
msg["From"] = email_user |
|
msg["To"] = email_to |
|
msg["Date"] = formatdate(localtime=True) |
|
msg["Subject"] = subject |
|
msg.attach(MIMEText(html_body, "html")) |
|
|
|
|
|
if attachment_bytes is not None: |
|
filename = attachment_filename if attachment_filename else "attachment.jpg" |
|
ctype, encoding = mimetypes.guess_type(filename) |
|
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) |
|
part = MIMEBase(maintype, subtype) |
|
part.set_payload(attachment_bytes) |
|
encoders.encode_base64(part) |
|
part.add_header("Content-Disposition", f'attachment; filename="{filename}"') |
|
msg.attach(part) |
|
elif attachment_path is not None: |
|
if not os.path.exists(attachment_path): |
|
logger.warning(f"Attachment path does not exist: {attachment_path}") |
|
else: |
|
with open(attachment_path, "rb") as f: |
|
file_bytes = f.read() |
|
filename = attachment_filename if attachment_filename else os.path.basename(attachment_path) |
|
ctype, encoding = mimetypes.guess_type(filename) |
|
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) |
|
part = MIMEBase(maintype, subtype) |
|
part.set_payload(file_bytes) |
|
encoders.encode_base64(part) |
|
part.add_header("Content-Disposition", f'attachment; filename="{filename}"') |
|
msg.attach(part) |
|
|
|
|
|
if smtp_port == 465: |
|
server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=20) |
|
else: |
|
server = smtplib.SMTP(smtp_server, smtp_port, timeout=20) |
|
server.ehlo() |
|
try: |
|
server.starttls() |
|
except Exception as e: |
|
logger.warning(f"starttls failed or not supported: {e}") |
|
|
|
server.login(email_user, email_pass) |
|
server.sendmail(email_user, [email_to], msg.as_string()) |
|
server.quit() |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
return True, ts |
|
|
|
except Exception as e: |
|
logger.exception("[EMAIL ERROR]") |
|
return False, str(e) |
|
|
|
def analyze_with_gemini_if_enabled(openrouter_cfg, image_path, frame_idx, bbox, conf_val, out_dir, log_csv, video_path, smtp_cfg): |
|
""" |
|
Minimal API call wrapper; returns parsed api_json or dict with error/skipped. |
|
Writes a single CSV row inside this function (keeps CSV consistent). |
|
CSV columns (order): id, timestamp, video_path, frame_idx, label, model_conf, |
|
bbox, x1, y1, x2, y2, image_path, crop_path, |
|
email_sent, email_timestamp, notes |
|
""" |
|
api_summary = "" |
|
try: |
|
|
|
if not openrouter_cfg or not openrouter_cfg.get("api_key") or OpenAI is None: |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
row = [ |
|
uid, ts, video_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, "", |
|
"False", "", |
|
"api_skipped" |
|
] |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
return {"skipped": True} |
|
|
|
|
|
client = OpenAI(base_url=openrouter_cfg.get("base_url", "https://openrouter.ai/api/v1"), |
|
api_key=openrouter_cfg["api_key"]) |
|
|
|
with open(image_path, "rb") as f: |
|
b64 = base64.b64encode(f.read()).decode("utf-8") |
|
|
|
prompt_text = """ |
|
You are a visual analyst. Examine the image and answer with a concise JSON object. |
|
Return EXACT JSON keys: theft, reason, suspect_description. |
|
If theft detected use "Yes" else "No". |
|
""" |
|
|
|
messages = [ |
|
{"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt_text}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
|
]} |
|
] |
|
|
|
completion = client.chat.completions.create( |
|
model=openrouter_cfg.get("model_name"), |
|
messages=messages |
|
) |
|
observation = completion.choices[0].message.content.strip() |
|
api_summary = observation.replace("\n", " ") |
|
|
|
api_json = {} |
|
try: |
|
start = observation.find("{") |
|
end = observation.rfind("}") + 1 |
|
if start != -1 and end != -1: |
|
api_json = json.loads(observation[start:end]) |
|
else: |
|
api_json = {"raw": observation} |
|
except Exception: |
|
api_json = {"raw": observation} |
|
|
|
api_theft = api_json.get("theft", "") if isinstance(api_json, dict) else "" |
|
api_reason = api_json.get("reason", "") if isinstance(api_json, dict) else "" |
|
api_suspect = api_json.get("suspect_description", "") if isinstance(api_json, dict) else "" |
|
|
|
crop_path = "" |
|
email_sent = False |
|
email_ts = "" |
|
|
|
if api_theft and str(api_theft).lower() == "yes": |
|
if bbox: |
|
x1i, y1i, x2i, y2i = bbox |
|
|
|
frame = None |
|
try: |
|
frame = cv2.imread(image_path) |
|
except Exception: |
|
frame = None |
|
if frame is not None and x2i > x1i and y2i > y1i: |
|
crop = frame[y1i:y2i, x1i:x2i] |
|
ok, crop_buf = cv2.imencode('.jpg', crop) |
|
if ok: |
|
crop_bytes = crop_buf.tobytes() |
|
|
|
try: |
|
crop_path = os.path.join(out_dir, f"crop_{frame_idx}.jpg") |
|
with open(crop_path, "wb") as cf: |
|
cf.write(crop_bytes) |
|
except Exception: |
|
|
|
crop_path = "" |
|
|
|
details_html = "<ul>" |
|
if api_reason: |
|
details_html += f"<li><b>Reason:</b> {api_reason}</li>" |
|
if api_suspect: |
|
details_html += f"<li><b>Suspect:</b> {api_suspect}</li>" |
|
details_html += "</ul>" |
|
html_body = f""" |
|
<html><body> |
|
<h3>⚠ Alert - Suspicious activity detected</h3> |
|
{details_html} |
|
<p>Frame index: {frame_idx} | Confidence: {conf_val:.3f}</p> |
|
</body></html> |
|
""" |
|
if smtp_cfg and smtp_cfg.get("enabled"): |
|
success, info = send_email_with_attachment( |
|
smtp_cfg, |
|
"Shoplifting Alert", |
|
html_body, |
|
attachment_bytes=crop_bytes, |
|
attachment_filename=f"crop_{frame_idx}.jpg" |
|
) |
|
if success: |
|
email_sent = True |
|
email_ts = info |
|
|
|
|
|
|
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
if bbox: |
|
try: |
|
x1_val, y1_val, x2_val, y2_val = bbox |
|
x1 = int(x1_val); y1 = int(y1_val); x2 = int(x2_val); y2 = int(y2_val) |
|
bbox_str = f"{x1},{y1},{x2},{y2}" |
|
except Exception: |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
|
|
|
|
notes = "" |
|
if api_summary: |
|
notes = api_summary |
|
else: |
|
|
|
if api_theft: |
|
notes = f"theft:{api_theft} reason:{api_reason}" |
|
else: |
|
notes = "" |
|
|
|
row = [ |
|
uid, ts, video_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, crop_path, |
|
str(email_sent), email_ts, |
|
notes |
|
] |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
|
|
return api_json |
|
|
|
except Exception as e: |
|
print("[API ERROR]", e) |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
try: |
|
if bbox: |
|
x1v, y1v, x2v, y2v = bbox |
|
x1 = x1v; y1 = y1v; x2 = x2v; y2 = y2v |
|
bbox_str = f"{x1},{y1},{x2},{y2}" |
|
except: |
|
pass |
|
|
|
row = [ |
|
uid, ts, video_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, "", |
|
"False", "", |
|
f"api_error: {e}" |
|
] |
|
try: |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
except Exception as e2: |
|
print("[CSV WRITE ERROR]", e2) |
|
return {"error": str(e)} |
|
|
|
|
|
def process_video_stream( |
|
video_path, |
|
model_path, |
|
out_root, |
|
openrouter_cfg=None, |
|
smtp_cfg=None, |
|
conf_thresh=0.5, |
|
confirm_conf_thresh=0.7, |
|
send_interval=4.0, |
|
confirmed_block_seconds=1000.0, |
|
progress_interval_frames=50 |
|
): |
|
""" |
|
Generator that yields updates during processing. |
|
Yields dicts with keys: status, live_frame, suspicious_list, csv_path, annotated_video (only final) |
|
""" |
|
_ensure_dir(out_root) |
|
suspicious_dir = os.path.join(out_root, "suspicious_frames") |
|
_ensure_dir(suspicious_dir) |
|
|
|
csv_path = os.path.join(suspicious_dir, "suspicious_log.csv") |
|
CSV_HEADER = [ |
|
"id", "timestamp", "video_path", "frame_idx", "label", "model_conf", |
|
"bbox", "x1", "y1", "x2", "y2", "image_path", "crop_path", |
|
"email_sent", "email_timestamp", "notes" |
|
] |
|
|
|
with open(csv_path, "w", newline="", encoding="utf-8") as f: |
|
w = csv.writer(f) |
|
w.writerow(CSV_HEADER) |
|
|
|
|
|
yield {"status": "Loading model...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
model = YOLO(model_path) |
|
|
|
|
|
device = "cpu" |
|
try: |
|
import torch |
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
except Exception: |
|
device = "cpu" |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
yield {"status": f"ERROR: Cannot open video {video_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
return |
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25 |
|
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
annotated_video_path = os.path.join(out_root, f"annotated_{int(time.time())}.mp4") |
|
writer = cv2.VideoWriter(annotated_video_path, fourcc, int(fps), (frame_w, frame_h)) |
|
|
|
last_sent_time = 0.0 |
|
extra_block_until = 0.0 |
|
frame_idx = 0 |
|
suspicious_saved = [] |
|
last_live = "" |
|
|
|
yield {"status": "Starting inference...", "live_frame": "", "suspicious_list": suspicious_saved, "csv_path": csv_path} |
|
|
|
results_gen = model.predict(source=video_path, stream=True, device=device, conf=conf_thresh) |
|
for r in results_gen: |
|
frame_rgb = getattr(r, "orig_img", None) |
|
if frame_rgb is None: |
|
frame_idx += 1 |
|
continue |
|
frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR).copy() |
|
|
|
boxes, confs, cls_idx = [], [], [] |
|
if hasattr(r, "boxes") and r.boxes is not None: |
|
try: |
|
boxes = r.boxes.xyxy.cpu().numpy().tolist() |
|
confs = r.boxes.conf.cpu().numpy().tolist() |
|
cls_idx = r.boxes.cls.cpu().numpy().astype(int).tolist() |
|
except Exception: |
|
try: |
|
boxes = r.boxes.xyxy.tolist() |
|
confs = r.boxes.conf.tolist() |
|
cls_idx = [int(x) for x in getattr(r.boxes, "cls", [])] if getattr(r.boxes, "cls", None) is not None else [] |
|
except Exception: |
|
boxes, confs, cls_idx = [], [], [] |
|
|
|
found_high_conf = False |
|
highest_conf = 0.0 |
|
chosen_bbox = None |
|
|
|
for i, box in enumerate(boxes): |
|
conf = confs[i] if i < len(confs) else 0.0 |
|
cid = cls_idx[i] if i < len(cls_idx) else None |
|
label = str(cid) if cid is not None else "obj" |
|
try: |
|
if isinstance(model.names, dict): |
|
label = model.names.get(cid, str(cid)) |
|
else: |
|
label = model.names[cid] |
|
except Exception: |
|
pass |
|
|
|
if conf < conf_thresh: |
|
continue |
|
|
|
x1, y1, x2, y2 = [int(round(float(v))) for v in box[:4]] |
|
|
|
if label.strip().lower() == "shoplifting": |
|
color = (0, 0, 255) |
|
if conf >= confirm_conf_thresh and conf > highest_conf: |
|
found_high_conf = True |
|
highest_conf = conf |
|
chosen_bbox = (x1, y1, x2, y2) |
|
else: |
|
color = (0, 255, 0) |
|
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
|
label_text = f"{label} {conf:.2f}" |
|
cv2.putText(frame, label_text, (x1, max(15, y1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA) |
|
|
|
now = time.time() |
|
if found_high_conf: |
|
if now - last_sent_time >= send_interval and now >= extra_block_until: |
|
saved_name = f"frame_{frame_idx:06d}_{int(time.time())}.jpg" |
|
saved_path = os.path.join(suspicious_dir, saved_name) |
|
cv2.imwrite(saved_path, frame) |
|
suspicious_saved.append(saved_path) |
|
last_live = saved_path |
|
|
|
|
|
analyze_with_gemini_if_enabled(openrouter_cfg, saved_path, frame_idx, chosen_bbox, highest_conf, |
|
suspicious_dir, csv_path, video_path, smtp_cfg) |
|
|
|
last_sent_time = now |
|
extra_block_until = now + confirmed_block_seconds |
|
|
|
|
|
yield {"status": f"Suspicious frame saved: {os.path.basename(saved_path)} (frame {frame_idx})", |
|
"live_frame": last_live, |
|
"suspicious_list": suspicious_saved.copy(), |
|
"csv_path": csv_path} |
|
|
|
writer.write(frame) |
|
frame_idx += 1 |
|
|
|
if frame_idx % progress_interval_frames == 0: |
|
yield {"status": f"Processed {frame_idx} frames...", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path} |
|
|
|
writer.release() |
|
cap.release() |
|
|
|
|
|
yield {"status": "Finished processing.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path, "annotated_video": annotated_video_path} |
|
|