# app/ui_streamlit.py # Ensure project root is on sys.path when Streamlit runs this as a script import sys, pathlib ROOT = pathlib.Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) import os, json from pathlib import Path from app.main import get_env, ensure_index_exists from app.search import search import streamlit as st st.markdown(""" """, unsafe_allow_html=True) # ── Streamlit config ────────────────────────────────────────────────────────── st.set_page_config(page_title="Grants Discovery App By Lupo", page_icon="🧭", layout="wide") # ── Theme & CSS (BLACK + ORANGE, dark selects) ──────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ── Hero block (single) ─────────────────────────────────────────────────────── st.markdown("""

Grants Discovery Live RAG by Lupo

Find capacity-building grants fast.

""", unsafe_allow_html=True) # ── Hide developer diagnostics by default ───────────────────────────────────── SHOW_DEV = os.environ.get("SHOW_DEV") == "1" # ── Environment + index ─────────────────────────────────────────────────────── _env = get_env() ensure_index_exists(_env) # ---------- helpers ---------- def _dedup_records(rows): seen, out = set(), [] for r in rows or []: k = r.get("id") or r.get("url") or r.get("title") if not k or k in seen: continue seen.add(k) out.append(r) return out def _norm_list(v): if v is None: return [] if isinstance(v, str): parts = [p.strip() for p in v.replace(";", ",").split(",")] return [p.lower() for p in parts if p] if isinstance(v, (list, tuple, set)): return [str(x).lower() for x in v] return [] def _matches_filters(rec, geo_sel, cat_sel): rec_geo = _norm_list(rec.get("geo") or rec.get("region") or rec.get("state")) rec_cat = _norm_list(rec.get("categories") or rec.get("cats") or rec.get("category")) g_ok = (not geo_sel) or (set([g.lower() for g in geo_sel]) & set(rec_geo)) c_ok = (not cat_sel) or (set([c.lower() for c in cat_sel]) & set(rec_cat)) return g_ok and c_ok def _ministry_filter(rows): if not rows: return rows banned_terms = [ "broad agency announcement", "baa", "research", "r&d", "prototype", "laboratory", "university", "sbir", "sttr", "darpa", "office of naval research", "onr", "naval", "air force", "army", "w911", "n00014", "fa-", "afrl", "arpa" ] preferred_agencies = { "FTA", "HHS", "ACL", "USDA", "USDA-FNS", "USDA-RD", "DOL", "DOJ", "OJP", "OVW", "EDA", "HRSA", "SAMHSA", "CFPB", "HUD" } required_any_terms = [ "vehicle", "van", "bus", "paratransit", "mobility", "congregate meals", "home-delivered meals", "senior nutrition", "food pantry", "food bank", "hunger relief", "refrigeration", "freezer", "community", "faith", "church", "ministry", "nonprofit", "reentry", "workforce", "case management", "technical assistance" ] def txt(r): return " ".join([ str(r.get("title","")), str(r.get("synopsis") or r.get("summary") or ""), str(r.get("agency") or ""), ]).lower() kept = [] for r in rows: t = txt(r) if any(b in t for b in banned_terms): continue agency = (r.get("agency") or "").upper() cats = [c.lower() for c in (r.get("categories") or [])] is_preferred_agency = any(agency.startswith(a) for a in preferred_agencies) has_ministry_cue = any(term in t for term in required_any_terms) or any( c in {"transportation","vehicle","elderly","disabled","food","community","justice","reentry","workforce"} for c in cats ) if is_preferred_agency or has_ministry_cue: kept.append(r) return kept # ---------- end helpers ---------- # ---------- optional diagnostics ---------- with st.expander("Diagnostics (optional)", expanded=False): idx = Path(_env["INDEX_DIR"]) st.write("INDEX_DIR:", str(idx)) st.write("faiss.index exists:", (idx / "faiss.index").exists()) st.write("meta.json exists:", (idx / "meta.json").exists()) if (idx / "meta.json").exists(): try: meta = json.loads((idx / "meta.json").read_text()) st.write("meta.json count:", len(meta)) st.write("meta head:", [{"id": m.get("id"), "title": m.get("title")} for m in meta[:2]]) except Exception as e: st.error(f"Failed to read meta.json: {e!r}") try: demo = search("transportation", _env, top_k=3, filters={}) st.write("sample search('transportation') results:", len(demo)) if demo: st.write(demo[:3]) except Exception as e: st.error(f"search() raised: {e!r}") # ---------- end diagnostics ---------- st.title("Grants Discovery RAG (Capacity Building)") preset = st.radio( "Quick topic:", ["General", "Elderly", "Prison Ministry", "Evangelism", "Vehicles/Transport", "FTA 5310"], horizontal=True ) default_q = { "General": "capacity building", "Elderly": "capacity building for seniors and aging services", "Prison Ministry": "capacity building for reentry and prison ministry", "Evangelism": "capacity building for faith and community outreach", "Vehicles/Transport": "capacity building transportation vehicles vans buses mobility", "FTA 5310": "5310 Enhanced Mobility Seniors Individuals with Disabilities", }.get(preset, "capacity building") # --- controls --- q = st.text_input("Search query", value=default_q) geo = st.multiselect("Geo filter (optional)", options=["US", "MD", "PA"], default=[]) categories = st.multiselect( "Category filter (optional)", options=[ "capacity_building","elderly","prison_ministry","evangelism", "transportation","vehicle", "justice","reentry","victim_services","youth","women","food","workforce" ], default=[] ) top_k = st.slider("Results", 5, 50, 15) sort_by = st.selectbox("Sort by", ["Relevance", "Deadline (soonest first)"], index=0) only_open = st.checkbox("Only show opportunities with a future deadline", value=True) ministry_focus = st.checkbox("Ministry Focus (hide research/defense/academic BAAs)", value=True) # Build backend filters (if the search() supports them) backend_filters = {} if geo: backend_filters["geo"] = geo if categories: backend_filters["categories"] = categories col1, col2 = st.columns([1, 1]) with col1: if st.button("Search"): try: raw = search(q, _env, top_k=top_k, filters=backend_filters) dedup = _dedup_records(raw) # 1) Geo/Category client-side filter (fallback if backend ignores) if geo or categories: base_filtered = [r for r in dedup if _matches_filters(r, geo, categories)] else: base_filtered = dedup # 2) Only-open filter from datetime import date, datetime def _to_date_safe(val): if not val: return None try: return datetime.fromisoformat(str(val)).date() except Exception: return None open_filtered = base_filtered if only_open: open_filtered = [r for r in base_filtered if (_to_date_safe(r.get("deadline")) or date.max) >= date.today()] # 3) Ministry filter final_results = _ministry_filter(open_filtered) if ministry_focus else open_filtered # --- Step 1: clear any previous “show hidden” choice when filter is off if not ministry_focus and st.session_state.get("show_hidden"): st.session_state.pop("show_hidden", None) # --- Step 2: count how many items the ministry filter hid hidden_due_to_ministry = 0 if ministry_focus: hidden_due_to_ministry = len(open_filtered) - len(final_results) # Reset toggle on a new search run st.session_state.pop("show_hidden", None) # Save fully-filtered results st.session_state["results"] = final_results st.session_state["last_query"] = q st.session_state["last_filters"] = { "geo": geo, "categories": categories, "only_open": only_open, "ministry_focus": ministry_focus, } # Honest breakdown message (now includes hidden count) st.success( f"Found {len(dedup)} total • After geo/cat: {len(base_filtered)} • " f"Open-only: {len(open_filtered)} • Displaying: {len(final_results)}" + (f" • Hidden by ministry filter: {hidden_due_to_ministry}" if ministry_focus else "") ) # Checkbox to reveal hidden items without re-searching if ministry_focus and hidden_due_to_ministry > 0: if st.checkbox(f"Show hidden items ({hidden_due_to_ministry})", value=False, key="show_hidden"): st.session_state["results"] = open_filtered except Exception as e: st.error(str(e)) with col2: if st.button("Export Results to CSV"): results_for_export = st.session_state.get("results", []) if not results_for_export: st.warning("No results to export. Run a search first.") else: os.makedirs(_env["EXPORT_DIR"], exist_ok=True) out_path = os.path.join(_env["EXPORT_DIR"], "results.csv") import pandas as pd pd.DataFrame(results_for_export).to_csv(out_path, index=False) st.success(f"Exported to {out_path}") st.markdown("---") # ---- Sorting/filter helpers ---- from datetime import date, datetime def _to_date(d): if not d: return None try: return datetime.fromisoformat(str(d)).date() except Exception: return None # ---- Render results ---- results = st.session_state.get("results", []) # Apply sort if selected if sort_by.startswith("Deadline") and results: results.sort( key=lambda r: ( _to_date(r.get("deadline")) is None, _to_date(r.get("deadline")) or date.max, ) ) # Did the user run a search? ran_search = bool(st.session_state.get("last_query")) if results: st.caption(f"Results: {len(results)}") for r in results: title = r.get("title", "(no title)") url = r.get("url", "") cats = r.get("categories") or r.get("cats") or [] geo_tags = r.get("geo") or [] st.markdown(f"### {title}") st.write(f"**Source:** {r.get('source','')} | **Geo:** {', '.join(geo_tags) if isinstance(geo_tags, list) else geo_tags} | **Categories:** {', '.join(cats) if isinstance(cats, list) else cats}") if url and not url.startswith("http"): st.caption("Note: This item may display an ID or number instead of a full link. Open on Grants.gov if needed.") st.write(f"[Open Link]({url}) \nScore: {r.get('score', 0):.3f}") posted = r.get("posted_date") or "" deadline = r.get("deadline") or "" st.caption(f"Posted: {posted} • Deadline: {deadline}") st.markdown("---") else: if ran_search: st.info("No active grants match these filters right now. We’ll notify you when the next cycle opens.") else: st.info("Enter a query and click Search.")