# app.py — Smart Ovitrap Demo (Streamlit on Hugging Face Spaces)
# --------------------------------------------------------------
# Features
# - Auto-generate QR codes for traps (qrs/*.png) with deep links to hf.space
# - Camera capture + upload fallback; Together AI counting
# - Weekly KPIs (Coverage, OPI, EDI), trends, per-trap table
# - pydeck heat/points/labels map (enable by adding lat/lon in traps.csv)
# - Polished UI and safe error handling
import os, re, io, json, base64, datetime as dt, zipfile
from io import BytesIO
import numpy as np
import pandas as pd
import streamlit as st
import pydeck as pdk
from together import Together
import qrcode
# ---------- Page & Theme ----------
st.set_page_config(page_title="Smart Ovitrap Demo", layout="wide", page_icon="🦟")
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# ---------- Config ----------
TRAPS_CSV = "traps.csv"
COUNTS_CSV = "counts.csv"
TOGETHER_API_KEY = os.getenv("TOGETHER_API_KEY", "")
client = Together(api_key=TOGETHER_API_KEY) if TOGETHER_API_KEY else None
# ---------- QR Utilities (run inside HF Space) ----------
def get_public_base_url() -> str:
"""
Compute the direct hf.space URL from SPACE_ID.
Fallback to OVITRAP_APP_URL (optional override for local dev).
"""
sid = os.getenv("SPACE_ID", "")
if "/" in sid:
user, space = sid.split("/", 1)
return f"https://{user}-{space}.hf.space"
return os.getenv("OVITRAP_APP_URL", "").rstrip("/")
def ensure_qrs_generated(traps_df: pd.DataFrame, base_url: str, out_dir: str = "qrs") -> int:
"""
Create QR PNGs for any missing trap IDs in traps_df into out_dir.
Returns number of files created.
"""
if traps_df.empty or "trap_id" not in traps_df.columns:
return 0
os.makedirs(out_dir, exist_ok=True)
made = 0
for trap in traps_df["trap_id"].astype(str):
url = f"{base_url}/?trap_id={trap}"
path = os.path.join(out_dir, f"{trap}.png")
if not os.path.exists(path):
qrcode.make(url).save(path)
made += 1
return made
def list_qrs(out_dir: str = "qrs") -> list[tuple[str, str]]:
"""
Return list of (filename, url) for existing QR images.
"""
base_url = get_public_base_url()
if not base_url or not os.path.isdir(out_dir):
return []
items = []
for name in sorted(os.listdir(out_dir)):
if name.lower().endswith(".png"):
trap = os.path.splitext(name)[0]
items.append((name, f"{base_url}/?trap_id={trap}"))
return items
def zip_all_qrs(out_dir: str = "qrs") -> BytesIO:
"""
Create an in-memory zip of qrs/ folder.
"""
mem = BytesIO()
with zipfile.ZipFile(mem, "w", zipfile.ZIP_DEFLATED) as zf:
if os.path.isdir(out_dir):
for name in sorted(os.listdir(out_dir)):
p = os.path.join(out_dir, name)
if os.path.isfile(p):
zf.write(p, arcname=name)
mem.seek(0)
return mem
# ---------- Data Helpers ----------
@st.cache_data
def load_traps() -> pd.DataFrame:
if os.path.exists(TRAPS_CSV):
df = pd.read_csv(TRAPS_CSV)
if "trap_id" in df.columns:
df["trap_id"] = df["trap_id"].astype(str)
return df
return pd.DataFrame(columns=["trap_id", "lat", "lon", "label"])
def append_count(ts, week, trap_id, count, model):
os.makedirs(os.path.dirname(COUNTS_CSV) or ".", exist_ok=True)
with open(COUNTS_CSV, "a", encoding="utf-8") as f:
f.write(f"{ts},{week},{trap_id},{int(count)},{model}\n")
@st.cache_data
def load_counts() -> pd.DataFrame:
if not os.path.exists(COUNTS_CSV):
return pd.DataFrame(columns=["timestamp", "week", "trap_id", "count", "model"])
df = pd.read_csv(COUNTS_CSV)
if "trap_id" in df.columns:
df["trap_id"] = df["trap_id"].astype(str)
return df
# ---------- Together AI Counting ----------
def strict_prompt() -> str:
return (
"You are counting mosquito eggs/larvae on an ovitrap strip placed on a calibration card.\n"
"Rules:\n"
"1) Return ONLY a single JSON object exactly like: {\"count\": }.\n"
"2) If unsure, give your best single integer estimate.\n"
"3) Ignore dust, stains, or grid marks on the card.\n"
"4) Count visible eggs/larvae only.\n"
"Output JSON only. No extra text."
)
def call_together_count(image_bytes: bytes, model_name: str) -> int:
if not client:
raise RuntimeError("TOGETHER_API_KEY is not set (Space secret/env var).")
b64 = base64.b64encode(image_bytes).decode("utf-8")
resp = client.chat.completions.create(
model=model_name,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": strict_prompt()},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}]
}],
temperature=0.1,
)
msg = resp.choices[0].message
content = getattr(msg, "content", str(msg))
if isinstance(content, list):
content = "\n".join([p.get("text", p) if isinstance(p, dict) else str(p) for p in content])
m = re.search(r"\{.*?\}", content, re.S)
if not m:
raise ValueError(f"Model did not return JSON. Got: {content[:200]}...")
data = json.loads(m.group(0))
return int(data.get("count"))
# ---------- Session ----------
if "uploader_key" not in st.session_state:
st.session_state.uploader_key = 0
if "last_photo_bytes" not in st.session_state:
st.session_state.last_photo_bytes = None
# ---------- Load Data & Auto-generate QRs ----------
traps_df = load_traps()
# Generate missing QRs once we have traps_df
try:
_base = get_public_base_url()
if _base:
ensure_qrs_generated(traps_df, _base)
except Exception:
# Keep silent to avoid blocking the app if storage is ephemeral
pass
# ---------- Header ----------
st.title("🦟 Smart Ovitrap • Demo")
st.caption(
"Scan a QR on an ovitrap to open this app with its Trap ID pre-filled. "
"Capture the ovistrip image, count eggs/larvae with Together AI, and monitor weekly indices."
)
# ---------- Query Param: trap_id ----------
trap_id_from_qp = ""
try:
qp = st.query_params
trap_id_from_qp = qp.get("trap_id", "")
if isinstance(trap_id_from_qp, list):
trap_id_from_qp = trap_id_from_qp[0] if trap_id_from_qp else ""
except Exception:
q_old = st.experimental_get_query_params()
if "trap_id" in q_old:
trap_id_from_qp = q_old["trap_id"][0]
trap_default = trap_id_from_qp or (traps_df["trap_id"].iloc[0] if not traps_df.empty else "T001")
# ---------- Sidebar ----------
st.sidebar.header("Settings")
model = st.sidebar.selectbox(
"Vision model",
[
"meta-llama/Llama-Vision-Free",
"meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo",
"Qwen/Qwen2.5-VL-72B-Instruct",
],
index=0,
)
with st.sidebar.expander("Photo tips", expanded=True):
st.markdown(
"- Place strip within the calibration rectangle \n"
"- Bright, even light; avoid glare/shadows \n"
"- Keep phone parallel to the card \n"
"- Retake if blurry",
)
# ---------- Main Layout ----------
left, right = st.columns([1, 1], gap="large")
# ----- Left: Capture & Count -----
with left:
st.markdown("### Capture")
with st.container(border=True):
trap_id = st.text_input("Trap ID (auto-filled via QR)", value=trap_default, key="trap_id_input")
today = dt.date.today()
y, w, _ = today.isocalendar()
week = st.text_input("Week label", value=f"{y}-W{w:02d}", key="week_input")
st.markdown('
Use phone camera or upload a file. “Retake” clears the last photo.
', unsafe_allow_html=True)
cam = st.camera_input("Take a photo")
if cam is not None:
st.session_state.last_photo_bytes = cam.getvalue()
up = st.file_uploader("…or upload (jpg/png)", type=["jpg", "jpeg", "png"],
key=f"uploader_{st.session_state.uploader_key}")
if up is not None:
st.session_state.last_photo_bytes = up.read()
if st.session_state.last_photo_bytes:
st.image(st.session_state.last_photo_bytes, caption="Preview", use_container_width=True)
c1, c2 = st.columns(2)
do_count = c1.button("Count with Together AI", type="primary",
disabled=not st.session_state.last_photo_bytes)
do_retake = c2.button("Retake")
if do_retake:
st.session_state.uploader_key += 1
st.session_state.last_photo_bytes = None
st.rerun()
if do_count:
if not trap_id:
st.error("Please enter a Trap ID.")
elif not TOGETHER_API_KEY:
st.error("Set a TOGETHER_API_KEY Space secret (or env var).")
else:
with st.spinner("Counting…"):
try:
count = call_together_count(st.session_state.last_photo_bytes, model)
ts = dt.datetime.now().isoformat(timespec="seconds")
append_count(ts, week, trap_id, count, model)
load_counts.clear()
st.success(f"✅ Count for {trap_id}: **{count}**")
except Exception as e:
st.exception(e)
# ----- QR Manager -----
with st.expander("🧾 QR Manager (for field demo)", expanded=False):
base_url = get_public_base_url()
if not base_url:
st.warning(
"Public base URL not detected. On Hugging Face Spaces this is automatic. "
"For local dev, set OVITRAP_APP_URL (e.g., https://-.hf.space)."
)
c1, c2 = st.columns(2)
if c1.button("Generate/Refresh all QR codes"):
n = ensure_qrs_generated(traps_df, base_url or "http://127.0.0.1:8501")
st.success(f"Generated {n} QR codes (missing only).")
qrs = list_qrs()
if not qrs:
st.info("No QR images yet. Click **Generate/Refresh all QR codes**.")
else:
zip_bytes = zip_all_qrs()
st.download_button("⬇️ Download all as ZIP", data=zip_bytes,
file_name="qrs.zip", mime="application/zip")
st.caption("Individual QR codes:")
for fname, deep_url in qrs:
ci, cm = st.columns([1, 3])
with ci:
st.image(os.path.join("qrs", fname), width=140)
with cm:
st.write(f"**{fname}** → {deep_url}")
with open(os.path.join("qrs", fname), "rb") as f:
st.download_button("Download PNG", data=f.read(),
file_name=fname, mime="image/png",
key=f"dl_{fname}")
# ----- Right: Dashboard -----
with right:
st.markdown("### 📊 Weekly dashboard")
with st.container(border=True):
counts_df = load_counts()
auto = st.checkbox("Auto-refresh every 10s", value=False, help="Handy during live entry")
if auto:
import time as _t
_t.sleep(10)
st.rerun()
if counts_df.empty:
st.info("No counts yet. Capture a photo and click **Count**.")
else:
weeks = sorted(counts_df["week"].unique())
selected_week = st.selectbox("Week to view:", weeks, index=len(weeks) - 1)
week_df = counts_df[counts_df["week"] == selected_week].copy()
total_traps = len(traps_df) if not traps_df.empty else week_df["trap_id"].nunique()
latest_week = (
week_df.sort_values("timestamp")
.groupby("trap_id", as_index=False)
.tail(1)
)
traps_counted = latest_week["trap_id"].nunique()
traps_with_eggs = latest_week.loc[latest_week["count"] >= 1, "trap_id"].nunique()
coverage = (traps_counted / total_traps * 100) if total_traps else 0.0
edi = (
latest_week.set_index("trap_id")["count"]
.reindex(
traps_df["trap_id"] if not traps_df.empty else latest_week["trap_id"].unique(),
fill_value=0,
)
.mean()
)
opi = (traps_with_eggs / total_traps * 100) if total_traps else 0.0
k1, k2, k3 = st.columns(3)
with k1:
st.markdown(f"
Coverage {coverage:.0f}%
", unsafe_allow_html=True)
with k2:
st.markdown(f"
OPI (% ≥1) {opi:.1f}%
", unsafe_allow_html=True)
with k3:
st.markdown(f"
""",
unsafe_allow_html=True,
)
else:
st.info("Add lat/lon to traps.csv to enable the map.")
# ---------- Footer ----------
st.markdown("", unsafe_allow_html=True)
st.caption("Built for demonstration • Hugging Face Spaces • Streamlit • Together AI")