|
|
|
""" |
|
Processing logic for shoplifting detection (single-image version). |
|
Exposes process_image(...) generator which yields updates while processing. |
|
Each yielded update is a dict with keys: |
|
- status: human string |
|
- live_frame: path to latest suspicious frame image (or "") |
|
- suspicious_list: list of saved suspicious image paths so far |
|
- csv_path: path to CSV log (updated) |
|
- annotated_image: path to annotated image (only on final yield) |
|
""" |
|
|
|
import os |
|
import time |
|
import csv |
|
import json |
|
import base64 |
|
import cv2 |
|
from ultralytics import YOLO |
|
|
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
from openai import OpenAI |
|
except Exception: |
|
OpenAI = None |
|
|
|
def _ensure_dir(p): |
|
os.makedirs(p, exist_ok=True) |
|
|
|
def send_email_with_attachment(smtp_cfg, subject, html_body, attachment_path=None, attachment_bytes=None, attachment_filename=None): |
|
""" |
|
smtp_cfg: dict with smtp_server, smtp_port, email_user, email_pass, email_to, enabled |
|
Either attachment_path (str) or attachment_bytes (bytes) can be provided. |
|
If attachment_bytes provided and attachment_filename None -> uses 'attachment.jpg' as filename. |
|
Returns (success: bool, timestamp_str or error_str) |
|
""" |
|
if not smtp_cfg or not smtp_cfg.get("enabled"): |
|
return False, "smtp_disabled" |
|
|
|
try: |
|
import smtplib |
|
from email.mime.multipart import MIMEMultipart |
|
from email.mime.text import MIMEText |
|
from email.mime.base import MIMEBase |
|
from email import encoders |
|
from email.utils import formatdate |
|
import mimetypes |
|
|
|
smtp_server = smtp_cfg.get("smtp_server") |
|
smtp_port = int(smtp_cfg.get("smtp_port", 587)) |
|
email_user = smtp_cfg.get("email_user") |
|
email_pass = smtp_cfg.get("email_pass") |
|
email_to = smtp_cfg.get("email_to") |
|
|
|
if not (smtp_server and smtp_port and email_user and email_pass and email_to): |
|
logger.warning("Incomplete SMTP config — cannot send email.") |
|
return False, "incomplete_smtp_cfg" |
|
|
|
msg = MIMEMultipart() |
|
msg["From"] = email_user |
|
msg["To"] = email_to |
|
msg["Date"] = formatdate(localtime=True) |
|
msg["Subject"] = subject |
|
msg.attach(MIMEText(html_body, "html")) |
|
|
|
|
|
if attachment_bytes is not None: |
|
filename = attachment_filename if attachment_filename else "attachment.jpg" |
|
ctype, encoding = mimetypes.guess_type(filename) |
|
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) |
|
part = MIMEBase(maintype, subtype) |
|
part.set_payload(attachment_bytes) |
|
encoders.encode_base64(part) |
|
part.add_header("Content-Disposition", f'attachment; filename="{filename}"') |
|
msg.attach(part) |
|
elif attachment_path is not None: |
|
if not os.path.exists(attachment_path): |
|
logger.warning(f"Attachment path does not exist: {attachment_path}") |
|
else: |
|
with open(attachment_path, "rb") as f: |
|
file_bytes = f.read() |
|
filename = attachment_filename if attachment_filename else os.path.basename(attachment_path) |
|
ctype, encoding = mimetypes.guess_type(filename) |
|
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream")) |
|
part = MIMEBase(maintype, subtype) |
|
part.set_payload(file_bytes) |
|
encoders.encode_base64(part) |
|
part.add_header("Content-Disposition", f'attachment; filename="{filename}"') |
|
msg.attach(part) |
|
|
|
|
|
if smtp_port == 465: |
|
server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=20) |
|
else: |
|
server = smtplib.SMTP(smtp_server, smtp_port, timeout=20) |
|
server.ehlo() |
|
try: |
|
server.starttls() |
|
except Exception as e: |
|
logger.warning(f"starttls failed or not supported: {e}") |
|
|
|
server.login(email_user, email_pass) |
|
server.sendmail(email_user, [email_to], msg.as_string()) |
|
server.quit() |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
return True, ts |
|
|
|
except Exception as e: |
|
logger.exception("[EMAIL ERROR]") |
|
return False, str(e) |
|
|
|
|
|
def analyze_with_gemini_if_enabled(openrouter_cfg, image_path, frame_idx, bbox, conf_val, out_dir, log_csv, source_path, smtp_cfg): |
|
""" |
|
Same function as in video.py (adapted call param name source_path instead of video_path). |
|
Writes a single CSV row inside this function. |
|
CSV columns (order): id, timestamp, video_path, frame_idx, label, model_conf, |
|
bbox, x1, y1, x2, y2, image_path, crop_path, |
|
email_sent, email_timestamp, notes |
|
""" |
|
api_summary = "" |
|
try: |
|
|
|
if not openrouter_cfg or not openrouter_cfg.get("api_key") or OpenAI is None: |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
row = [ |
|
uid, ts, source_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, "", |
|
"False", "", |
|
"api_skipped" |
|
] |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
return {"skipped": True} |
|
|
|
|
|
client = OpenAI(base_url=openrouter_cfg.get("base_url", "https://openrouter.ai/api/v1"), |
|
api_key=openrouter_cfg["api_key"]) |
|
|
|
with open(image_path, "rb") as f: |
|
b64 = base64.b64encode(f.read()).decode("utf-8") |
|
|
|
prompt_text = """ |
|
You are a visual analyst. Examine the image and answer with a concise JSON object. |
|
Return EXACT JSON keys: theft, reason, suspect_description. |
|
If theft detected use "Yes" else "No". |
|
""" |
|
|
|
messages = [ |
|
{"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt_text}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
|
]} |
|
] |
|
|
|
completion = client.chat.completions.create( |
|
model=openrouter_cfg.get("model_name"), |
|
messages=messages |
|
) |
|
observation = completion.choices[0].message.content.strip() |
|
api_summary = observation.replace("\n", " ") |
|
|
|
api_json = {} |
|
try: |
|
start = observation.find("{") |
|
end = observation.rfind("}") + 1 |
|
if start != -1 and end != -1: |
|
api_json = json.loads(observation[start:end]) |
|
else: |
|
api_json = {"raw": observation} |
|
except Exception: |
|
api_json = {"raw": observation} |
|
|
|
api_theft = api_json.get("theft", "") if isinstance(api_json, dict) else "" |
|
api_reason = api_json.get("reason", "") if isinstance(api_json, dict) else "" |
|
api_suspect = api_json.get("suspect_description", "") if isinstance(api_json, dict) else "" |
|
|
|
crop_path = "" |
|
email_sent = False |
|
email_ts = "" |
|
|
|
if api_theft and str(api_theft).lower() == "yes": |
|
if bbox: |
|
x1i, y1i, x2i, y2i = bbox |
|
|
|
frame = None |
|
try: |
|
frame = cv2.imread(image_path) |
|
except Exception: |
|
frame = None |
|
if frame is not None and x2i > x1i and y2i > y1i: |
|
crop = frame[y1i:y2i, x1i:x2i] |
|
ok, crop_buf = cv2.imencode('.jpg', crop) |
|
if ok: |
|
crop_bytes = crop_buf.tobytes() |
|
|
|
try: |
|
crop_path = os.path.join(out_dir, f"crop_{frame_idx}.jpg") |
|
with open(crop_path, "wb") as cf: |
|
cf.write(crop_bytes) |
|
except Exception: |
|
|
|
crop_path = "" |
|
|
|
details_html = "<ul>" |
|
if api_reason: |
|
details_html += f"<li><b>Reason:</b> {api_reason}</li>" |
|
if api_suspect: |
|
details_html += f"<li><b>Suspect:</b> {api_suspect}</li>" |
|
details_html += "</ul>" |
|
html_body = f""" |
|
<html><body> |
|
<h3>⚠ Alert - Suspicious activity detected</h3> |
|
{details_html} |
|
<p>Frame index: {frame_idx} | Confidence: {conf_val:.3f}</p> |
|
</body></html> |
|
""" |
|
if smtp_cfg and smtp_cfg.get("enabled"): |
|
success, info = send_email_with_attachment( |
|
smtp_cfg, |
|
"Shoplifting Alert", |
|
html_body, |
|
attachment_bytes=crop_bytes, |
|
attachment_filename=f"crop_{frame_idx}.jpg" |
|
) |
|
if success: |
|
email_sent = True |
|
email_ts = info |
|
|
|
|
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
if bbox: |
|
try: |
|
x1_val, y1_val, x2_val, y2_val = bbox |
|
x1 = int(x1_val); y1 = int(y1_val); x2 = int(x2_val); y2 = int(y2_val) |
|
bbox_str = f"{x1},{y1},{x2},{y2}" |
|
except Exception: |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
|
|
|
|
notes = "" |
|
if api_summary: |
|
notes = api_summary |
|
else: |
|
|
|
if api_theft: |
|
notes = f"theft:{api_theft} reason:{api_reason}" |
|
else: |
|
notes = "" |
|
|
|
row = [ |
|
uid, ts, source_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, crop_path, |
|
str(email_sent), email_ts, |
|
notes |
|
] |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
|
|
return api_json |
|
|
|
except Exception as e: |
|
print("[API ERROR]", e) |
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = f"frame_{frame_idx:06d}" |
|
bbox_str = "" |
|
x1 = y1 = x2 = y2 = "" |
|
try: |
|
if bbox: |
|
x1v, y1v, x2v, y2v = bbox |
|
x1 = x1v; y1 = y1v; x2 = x2v; y2 = y2v |
|
bbox_str = f"{x1},{y1},{x2},{y2}" |
|
except: |
|
pass |
|
|
|
row = [ |
|
uid, ts, source_path, frame_idx, "shoplifting", |
|
f"{conf_val:.3f}" if conf_val is not None else "", |
|
bbox_str, x1, y1, x2, y2, |
|
image_path, "", |
|
"False", "", |
|
f"api_error: {e}" |
|
] |
|
try: |
|
with open(log_csv, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
except Exception as e2: |
|
print("[CSV WRITE ERROR]", e2) |
|
return {"error": str(e)} |
|
|
|
|
|
def process_image( |
|
image_path, |
|
model_path, |
|
out_root, |
|
openrouter_cfg=None, |
|
smtp_cfg=None, |
|
conf_thresh=0.5, |
|
confirm_conf_thresh=0.7 |
|
): |
|
""" |
|
Generator that yields updates during processing of a single image. |
|
Yields dicts with keys: status, live_frame, suspicious_list, csv_path, annotated_image (only final) |
|
""" |
|
_ensure_dir(out_root) |
|
suspicious_dir = os.path.join(out_root, "suspicious_images") |
|
_ensure_dir(suspicious_dir) |
|
|
|
csv_path = os.path.join(suspicious_dir, "suspicious_log.csv") |
|
CSV_HEADER = [ |
|
"id", "timestamp", "video_path", "frame_idx", "label", "model_conf", |
|
"bbox", "x1", "y1", "x2", "y2", "image_path", "crop_path", |
|
"email_sent", "email_timestamp", "notes" |
|
] |
|
|
|
|
|
with open(csv_path, "w", newline="", encoding="utf-8") as f: |
|
w = csv.writer(f) |
|
w.writerow(CSV_HEADER) |
|
|
|
|
|
yield {"status": "Loading model...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
model = YOLO(model_path) |
|
|
|
|
|
device = "cpu" |
|
try: |
|
import torch |
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
except Exception: |
|
device = "cpu" |
|
|
|
|
|
if not os.path.exists(image_path): |
|
yield {"status": f"ERROR: Cannot open image {image_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
return |
|
|
|
frame_bgr = cv2.imread(image_path) |
|
if frame_bgr is None: |
|
yield {"status": f"ERROR: Failed to read image {image_path}", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
return |
|
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB).copy() |
|
|
|
yield {"status": "Starting inference on image...", "live_frame": "", "suspicious_list": [], "csv_path": csv_path} |
|
|
|
|
|
results = model.predict(source=image_path, device=device, conf=conf_thresh) |
|
|
|
annotated = frame_bgr.copy() |
|
suspicious_saved = [] |
|
last_live = "" |
|
detection_idx = 0 |
|
found_any = False |
|
|
|
|
|
for r in results: |
|
boxes, confs, cls_idx = [], [], [] |
|
if hasattr(r, "boxes") and r.boxes is not None: |
|
try: |
|
boxes = r.boxes.xyxy.cpu().numpy().tolist() |
|
confs = r.boxes.conf.cpu().numpy().tolist() |
|
cls_idx = r.boxes.cls.cpu().numpy().astype(int).tolist() |
|
except Exception: |
|
try: |
|
boxes = r.boxes.xyxy.tolist() |
|
confs = r.boxes.conf.tolist() |
|
cls_idx = [int(x) for x in getattr(r.boxes, "cls", [])] if getattr(r.boxes, "cls", None) is not None else [] |
|
except Exception: |
|
boxes, confs, cls_idx = [], [], [] |
|
|
|
highest_conf = 0.0 |
|
chosen_bbox = None |
|
|
|
for i, box in enumerate(boxes): |
|
conf = confs[i] if i < len(confs) else 0.0 |
|
cid = cls_idx[i] if i < len(cls_idx) else None |
|
label = str(cid) if cid is not None else "obj" |
|
try: |
|
if isinstance(model.names, dict): |
|
label = model.names.get(cid, str(cid)) |
|
else: |
|
label = model.names[cid] |
|
except Exception: |
|
pass |
|
|
|
if conf < conf_thresh: |
|
continue |
|
|
|
x1, y1, x2, y2 = [int(round(float(v))) for v in box[:4]] |
|
|
|
color = (0, 255, 0) |
|
if label.strip().lower() == "shoplifting": |
|
color = (0, 0, 255) |
|
if conf >= confirm_conf_thresh and conf > highest_conf: |
|
highest_conf = conf |
|
chosen_bbox = (x1, y1, x2, y2) |
|
|
|
|
|
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2) |
|
label_text = f"{label} {conf:.2f}" |
|
cv2.putText(annotated, label_text, (x1, max(15, y1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA) |
|
|
|
|
|
if label.strip().lower() == "shoplifting" and conf >= confirm_conf_thresh: |
|
found_any = True |
|
saved_name = f"img_{detection_idx:03d}_{int(time.time())}.jpg" |
|
saved_path = os.path.join(suspicious_dir, saved_name) |
|
|
|
cv2.imwrite(saved_path, annotated) |
|
suspicious_saved.append(saved_path) |
|
last_live = saved_path |
|
|
|
|
|
analyze_with_gemini_if_enabled(openrouter_cfg, saved_path, detection_idx, (x1, y1, x2, y2), |
|
conf, suspicious_dir, csv_path, image_path, smtp_cfg) |
|
|
|
|
|
yield {"status": f"Suspicious detection saved: {os.path.basename(saved_path)} (det {detection_idx})", |
|
"live_frame": last_live, |
|
"suspicious_list": suspicious_saved.copy(), |
|
"csv_path": csv_path} |
|
|
|
detection_idx += 1 |
|
|
|
|
|
|
|
|
|
annotated_image_path = os.path.join(out_root, f"annotated_{int(time.time())}.jpg") |
|
cv2.imwrite(annotated_image_path, annotated) |
|
|
|
|
|
if not found_any: |
|
|
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
uid = "no_suspicious_000000" |
|
row = [ |
|
uid, ts, image_path, 0, "none", "", |
|
"", "", "", "", "", |
|
image_path, "", |
|
"False", "", |
|
"no_suspicious_detections" |
|
] |
|
with open(csv_path, "a", newline="", encoding="utf-8") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(row) |
|
|
|
yield {"status": "No suspicious detections found.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path} |
|
|
|
|
|
yield {"status": "Finished processing image.", "live_frame": last_live, "suspicious_list": suspicious_saved.copy(), "csv_path": csv_path, "annotated_image": annotated_image_path} |
|
|