# app.py — Smart Ovitrap Demo (Streamlit on Hugging Face Spaces) # -------------------------------------------------------------- # Features # - Auto-generate QR codes for traps (qrs/*.png) with deep links to hf.space # - Camera capture + upload fallback; Together AI counting # - Weekly KPIs (Coverage, OPI, EDI), trends, per-trap table # - pydeck heat/points/labels map (enable by adding lat/lon in traps.csv) # - Polished UI and safe error handling import os, re, io, json, base64, datetime as dt, zipfile from io import BytesIO import numpy as np import pandas as pd import streamlit as st import pydeck as pdk from together import Together import qrcode # ---------- Page & Theme ---------- st.set_page_config(page_title="Smart Ovitrap Demo", layout="wide", page_icon="🦟") st.markdown( """ """, unsafe_allow_html=True, ) # ---------- Config ---------- TRAPS_CSV = "traps.csv" COUNTS_CSV = "counts.csv" TOGETHER_API_KEY = os.getenv("TOGETHER_API_KEY", "") client = Together(api_key=TOGETHER_API_KEY) if TOGETHER_API_KEY else None # ---------- QR Utilities (run inside HF Space) ---------- def get_public_base_url() -> str: """ Compute the direct hf.space URL from SPACE_ID. Fallback to OVITRAP_APP_URL (optional override for local dev). """ sid = os.getenv("SPACE_ID", "") if "/" in sid: user, space = sid.split("/", 1) return f"https://{user}-{space}.hf.space" return os.getenv("OVITRAP_APP_URL", "").rstrip("/") def ensure_qrs_generated(traps_df: pd.DataFrame, base_url: str, out_dir: str = "qrs") -> int: """ Create QR PNGs for any missing trap IDs in traps_df into out_dir. Returns number of files created. """ if traps_df.empty or "trap_id" not in traps_df.columns: return 0 os.makedirs(out_dir, exist_ok=True) made = 0 for trap in traps_df["trap_id"].astype(str): url = f"{base_url}/?trap_id={trap}" path = os.path.join(out_dir, f"{trap}.png") if not os.path.exists(path): qrcode.make(url).save(path) made += 1 return made def list_qrs(out_dir: str = "qrs") -> list[tuple[str, str]]: """ Return list of (filename, url) for existing QR images. """ base_url = get_public_base_url() if not base_url or not os.path.isdir(out_dir): return [] items = [] for name in sorted(os.listdir(out_dir)): if name.lower().endswith(".png"): trap = os.path.splitext(name)[0] items.append((name, f"{base_url}/?trap_id={trap}")) return items def zip_all_qrs(out_dir: str = "qrs") -> BytesIO: """ Create an in-memory zip of qrs/ folder. """ mem = BytesIO() with zipfile.ZipFile(mem, "w", zipfile.ZIP_DEFLATED) as zf: if os.path.isdir(out_dir): for name in sorted(os.listdir(out_dir)): p = os.path.join(out_dir, name) if os.path.isfile(p): zf.write(p, arcname=name) mem.seek(0) return mem # ---------- Data Helpers ---------- @st.cache_data def load_traps() -> pd.DataFrame: if os.path.exists(TRAPS_CSV): df = pd.read_csv(TRAPS_CSV) if "trap_id" in df.columns: df["trap_id"] = df["trap_id"].astype(str) return df return pd.DataFrame(columns=["trap_id", "lat", "lon", "label"]) def append_count(ts, week, trap_id, count, model): os.makedirs(os.path.dirname(COUNTS_CSV) or ".", exist_ok=True) with open(COUNTS_CSV, "a", encoding="utf-8") as f: f.write(f"{ts},{week},{trap_id},{int(count)},{model}\n") @st.cache_data def load_counts() -> pd.DataFrame: if not os.path.exists(COUNTS_CSV): return pd.DataFrame(columns=["timestamp", "week", "trap_id", "count", "model"]) df = pd.read_csv(COUNTS_CSV) if "trap_id" in df.columns: df["trap_id"] = df["trap_id"].astype(str) return df # ---------- Together AI Counting ---------- def strict_prompt() -> str: return ( "You are counting mosquito eggs/larvae on an ovitrap strip placed on a calibration card.\n" "Rules:\n" "1) Return ONLY a single JSON object exactly like: {\"count\": }.\n" "2) If unsure, give your best single integer estimate.\n" "3) Ignore dust, stains, or grid marks on the card.\n" "4) Count visible eggs/larvae only.\n" "Output JSON only. No extra text." ) def call_together_count(image_bytes: bytes, model_name: str) -> int: if not client: raise RuntimeError("TOGETHER_API_KEY is not set (Space secret/env var).") b64 = base64.b64encode(image_bytes).decode("utf-8") resp = client.chat.completions.create( model=model_name, messages=[{ "role": "user", "content": [ {"type": "text", "text": strict_prompt()}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}] }], temperature=0.1, ) msg = resp.choices[0].message content = getattr(msg, "content", str(msg)) if isinstance(content, list): content = "\n".join([p.get("text", p) if isinstance(p, dict) else str(p) for p in content]) m = re.search(r"\{.*?\}", content, re.S) if not m: raise ValueError(f"Model did not return JSON. Got: {content[:200]}...") data = json.loads(m.group(0)) return int(data.get("count")) # ---------- Session ---------- if "uploader_key" not in st.session_state: st.session_state.uploader_key = 0 if "last_photo_bytes" not in st.session_state: st.session_state.last_photo_bytes = None # ---------- Load Data & Auto-generate QRs ---------- traps_df = load_traps() # Generate missing QRs once we have traps_df try: _base = get_public_base_url() if _base: ensure_qrs_generated(traps_df, _base) except Exception: # Keep silent to avoid blocking the app if storage is ephemeral pass # ---------- Header ---------- st.title("🦟 Smart Ovitrap • Demo") st.caption( "Scan a QR on an ovitrap to open this app with its Trap ID pre-filled. " "Capture the ovistrip image, count eggs/larvae with Together AI, and monitor weekly indices." ) # ---------- Query Param: trap_id ---------- trap_id_from_qp = "" try: qp = st.query_params trap_id_from_qp = qp.get("trap_id", "") if isinstance(trap_id_from_qp, list): trap_id_from_qp = trap_id_from_qp[0] if trap_id_from_qp else "" except Exception: q_old = st.experimental_get_query_params() if "trap_id" in q_old: trap_id_from_qp = q_old["trap_id"][0] trap_default = trap_id_from_qp or (traps_df["trap_id"].iloc[0] if not traps_df.empty else "T001") # ---------- Sidebar ---------- st.sidebar.header("Settings") model = st.sidebar.selectbox( "Vision model", [ "meta-llama/Llama-Vision-Free", "meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo", "Qwen/Qwen2.5-VL-72B-Instruct", ], index=0, ) with st.sidebar.expander("Photo tips", expanded=True): st.markdown( "- Place strip within the calibration rectangle \n" "- Bright, even light; avoid glare/shadows \n" "- Keep phone parallel to the card \n" "- Retake if blurry", ) # ---------- Main Layout ---------- left, right = st.columns([1, 1], gap="large") # ----- Left: Capture & Count ----- with left: st.markdown("### Capture") with st.container(border=True): trap_id = st.text_input("Trap ID (auto-filled via QR)", value=trap_default, key="trap_id_input") today = dt.date.today() y, w, _ = today.isocalendar() week = st.text_input("Week label", value=f"{y}-W{w:02d}", key="week_input") st.markdown('
Use phone camera or upload a file. “Retake” clears the last photo.
', unsafe_allow_html=True) cam = st.camera_input("Take a photo") if cam is not None: st.session_state.last_photo_bytes = cam.getvalue() up = st.file_uploader("…or upload (jpg/png)", type=["jpg", "jpeg", "png"], key=f"uploader_{st.session_state.uploader_key}") if up is not None: st.session_state.last_photo_bytes = up.read() if st.session_state.last_photo_bytes: st.image(st.session_state.last_photo_bytes, caption="Preview", use_container_width=True) c1, c2 = st.columns(2) do_count = c1.button("Count with Together AI", type="primary", disabled=not st.session_state.last_photo_bytes) do_retake = c2.button("Retake") if do_retake: st.session_state.uploader_key += 1 st.session_state.last_photo_bytes = None st.rerun() if do_count: if not trap_id: st.error("Please enter a Trap ID.") elif not TOGETHER_API_KEY: st.error("Set a TOGETHER_API_KEY Space secret (or env var).") else: with st.spinner("Counting…"): try: count = call_together_count(st.session_state.last_photo_bytes, model) ts = dt.datetime.now().isoformat(timespec="seconds") append_count(ts, week, trap_id, count, model) load_counts.clear() st.success(f"✅ Count for {trap_id}: **{count}**") except Exception as e: st.exception(e) # ----- QR Manager ----- with st.expander("🧾 QR Manager (for field demo)", expanded=False): base_url = get_public_base_url() if not base_url: st.warning( "Public base URL not detected. On Hugging Face Spaces this is automatic. " "For local dev, set OVITRAP_APP_URL (e.g., https://-.hf.space)." ) c1, c2 = st.columns(2) if c1.button("Generate/Refresh all QR codes"): n = ensure_qrs_generated(traps_df, base_url or "http://127.0.0.1:8501") st.success(f"Generated {n} QR codes (missing only).") qrs = list_qrs() if not qrs: st.info("No QR images yet. Click **Generate/Refresh all QR codes**.") else: zip_bytes = zip_all_qrs() st.download_button("⬇️ Download all as ZIP", data=zip_bytes, file_name="qrs.zip", mime="application/zip") st.caption("Individual QR codes:") for fname, deep_url in qrs: ci, cm = st.columns([1, 3]) with ci: st.image(os.path.join("qrs", fname), width=140) with cm: st.write(f"**{fname}** → {deep_url}") with open(os.path.join("qrs", fname), "rb") as f: st.download_button("Download PNG", data=f.read(), file_name=fname, mime="image/png", key=f"dl_{fname}") # ----- Right: Dashboard ----- with right: st.markdown("### 📊 Weekly dashboard") with st.container(border=True): counts_df = load_counts() auto = st.checkbox("Auto-refresh every 10s", value=False, help="Handy during live entry") if auto: import time as _t _t.sleep(10) st.rerun() if counts_df.empty: st.info("No counts yet. Capture a photo and click **Count**.") else: weeks = sorted(counts_df["week"].unique()) selected_week = st.selectbox("Week to view:", weeks, index=len(weeks) - 1) week_df = counts_df[counts_df["week"] == selected_week].copy() total_traps = len(traps_df) if not traps_df.empty else week_df["trap_id"].nunique() latest_week = ( week_df.sort_values("timestamp") .groupby("trap_id", as_index=False) .tail(1) ) traps_counted = latest_week["trap_id"].nunique() traps_with_eggs = latest_week.loc[latest_week["count"] >= 1, "trap_id"].nunique() coverage = (traps_counted / total_traps * 100) if total_traps else 0.0 edi = ( latest_week.set_index("trap_id")["count"] .reindex( traps_df["trap_id"] if not traps_df.empty else latest_week["trap_id"].unique(), fill_value=0, ) .mean() ) opi = (traps_with_eggs / total_traps * 100) if total_traps else 0.0 k1, k2, k3 = st.columns(3) with k1: st.markdown(f"
Coverage
{coverage:.0f}%
", unsafe_allow_html=True) with k2: st.markdown(f"
OPI (% ≥1)
{opi:.1f}%
", unsafe_allow_html=True) with k3: st.markdown(f"
EDI (eggs/trap)
{edi:.1f}
", unsafe_allow_html=True) # Trend across all weeks def kpis_for_week(df_week: pd.DataFrame): lw = df_week.sort_values("timestamp").groupby("trap_id", as_index=False).tail(1) _total = len(traps_df) if not traps_df.empty else lw["trap_id"].nunique() _traps_with = lw[lw["count"] >= 1]["trap_id"].nunique() _opi = (_traps_with / _total * 100) if _total else 0.0 _edi = ( lw.set_index("trap_id")["count"] .reindex( traps_df["trap_id"] if not traps_df.empty else lw["trap_id"].unique(), fill_value=0, ) .mean() ) return _opi, _edi trend = [] for wk in weeks: sub = counts_df[counts_df["week"] == wk] if sub.empty: continue _opi, _edi = kpis_for_week(sub) trend.append({"week": wk, "OPI": _opi, "EDI": _edi}) st.write("Trend (by week):") if trend: st.line_chart(pd.DataFrame(trend).set_index("week")) st.write("Per-trap (latest) this week:") show = latest_week.sort_values("count", ascending=False).reset_index(drop=True) st.dataframe(show, use_container_width=True) # Map (requires lat/lon in traps.csv) if not traps_df.empty and {"lat", "lon"}.issubset(traps_df.columns): map_df = traps_df.merge(latest_week[["trap_id", "count"]], on="trap_id", how="left").fillna({"count": 0}) deck = None # Build deck object if not map_df.empty: lat0 = float(map_df["lat"].mean()) lon0 = float(map_df["lon"].mean()) df = map_df.copy() df["count"] = pd.to_numeric(df.get("count", 0), errors="coerce").fillna(0).astype(float) df["label_text"] = df.apply(lambda r: f"{r.get('trap_id','')}: {int(r.get('count',0))}", axis=1) def risk_color(c): if c >= 50: return [220, 50, 47, 210] # red if c >= 10: return [253, 185, 80, 210] # amber if c >= 1: return [92, 184, 92, 210] # green return [180, 180, 180, 160] # grey df["rgba"] = df["count"].map(risk_color) df["radius"] = (36 + df["count"] * 2).clip(24, 180) color_range = [ [255, 255, 204],[255, 237, 160],[254, 217, 118], [254, 178, 76],[253, 141, 60],[240, 59, 32],[189, 0, 38] ] hex_layer = pdk.Layer( "HexagonLayer", data=df, get_position=["lon", "lat"], radius=150, elevation_scale=12, elevation_range=[0, 3000], extruded=True, coverage=1, pickable=True, get_elevation_weight="count", elevation_aggregation="SUM", get_color_weight="count", color_aggregation="SUM", color_range=color_range, opacity=0.45, ) points = pdk.Layer( "ScatterplotLayer", data=df, get_position=["lon", "lat"], get_radius="radius", get_fill_color="rgba", stroked=True, get_line_color=[255, 255, 255], line_width_min_pixels=1, pickable=True, ) labels = pdk.Layer( "TextLayer", data=df, get_position=["lon", "lat"], get_text="label_text", get_size=14, get_color=[30, 30, 30, 220], get_alignment_baseline="bottom", ) osm = pdk.Layer("TileLayer", data="https://a.tile.openstreetmap.org/{z}/{x}/{y}.png", min_zoom=0, max_zoom=19, tile_size=256) view = pdk.ViewState(latitude=lat0, longitude=lon0, zoom=13.5) deck = pdk.Deck(map_style=None, initial_view_state=view, layers=[osm, hex_layer, points, labels], tooltip={"text": "Trap {trap_id}\nCount: {count}\n{label}"}) if deck: st.subheader("🗺️ Kolkata hotspots (OpenStreetMap)") st.pydeck_chart(deck, use_container_width=True) st.markdown( """
Legend:
Low (1–9)
Amber (10–49)
Red (≥50)
No eggs
Hex height/color = total eggs in area
""", unsafe_allow_html=True, ) else: st.info("Add lat/lon to traps.csv to enable the map.") # ---------- Footer ---------- st.markdown("
", unsafe_allow_html=True) st.caption("Built for demonstration • Hugging Face Spaces • Streamlit • Together AI")