# app/ingest.py from __future__ import annotations import json from pathlib import Path from typing import Dict, List, Any import yaml import numpy as np from sentence_transformers import SentenceTransformer from app.paths import DOCSTORE_DIR, INDEX_DIR from .normalize import normalize # ← central normalizer # -------------------- Config -------------------- def load_config(cfg_path: str) -> Dict: with open(cfg_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) # -------------------- Grants.gov collector -------------------- def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: """ Calls the Grants.gov Search2 client and returns a list of RAW dicts (adapter may already be close to unified; we'll still run normalize()). """ from app.sources.grantsgov_api import search_grants # local import to avoid cycles api = src.get("api", {}) page_size = int(api.get("page_size", src.get("page_size", 100))) max_pages = int(api.get("max_pages", src.get("max_pages", 5))) payload = api.get("payload", src.get("payload", {})) url = src.get("url", "") out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) hits = out.get("hits", []) if isinstance(out, dict) else (out or []) return [h for h in hits if isinstance(h, dict)] # -------------------- Write docstore & build index -------------------- def _save_docstore(recs: List[Dict[str, Any]]) -> str: DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) path = DOCSTORE_DIR / "docstore.jsonl" with path.open("w", encoding="utf-8") as f: for r in recs: f.write(json.dumps(r, ensure_ascii=False) + "\n") return str(path) def _build_index_from_docstore() -> int: ds_path = DOCSTORE_DIR / "docstore.jsonl" if not ds_path.exists(): raise RuntimeError("Docstore not found. Run ingest first.") # Load records → texts + metas texts: List[str] = [] metas: List[Dict[str, Any]] = [] with ds_path.open("r", encoding="utf-8") as f: for line in f: rec = json.loads(line) title = rec.get("title") or "" synopsis = rec.get("synopsis") or rec.get("summary") or "" agency = rec.get("agency") or "" eligibility = rec.get("eligibility") or "" txt = "\n".join([title, synopsis, agency, eligibility]).strip() if not txt: continue texts.append(txt) metas.append({ "id": rec.get("id"), "title": title, "url": rec.get("url"), "source": rec.get("source"), "geo": rec.get("geo"), "categories": rec.get("categories"), "agency": agency, "deadline": rec.get("deadline"), "program_number": rec.get("program_number"), "posted_date": rec.get("posted_date"), }) print(f"[index] Rows loaded from docstore: {len(texts)}") if not texts: # Write an empty index file so downstream UI can still boot gracefully (INDEX_DIR).mkdir(parents=True, exist_ok=True) (INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) print("[index] No texts to embed. Wrote empty meta.json.") return 0 # Embed (CPU default; keeps it portable) model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") model.max_seq_length = 256 batch = max(8, min(32, len(texts))) # sensible batch size for small corpora emb = model.encode( texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True, batch_size=batch, ).astype(np.float32, copy=False) # FAISS index (Inner Product for cosine on normalized vectors) import faiss dim = emb.shape[1] index = faiss.IndexFlatIP(dim) index.add(emb) INDEX_DIR.mkdir(parents=True, exist_ok=True) faiss.write_index(index, str(INDEX_DIR / "faiss.index")) (INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") return len(texts) # -------------------- Ingest main -------------------- def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): """ Reads config, fetches from enabled sources, normalizes with a single map, attaches categories/geo consistently, DEDUPEs, and builds the index. """ cfg = load_config(cfg_path) all_rows: List[Dict[str, Any]] = [] for entry in cfg.get("sources", []): if not entry.get("enabled"): continue name = entry.get("name", "") geo = entry.get("geo") or "US" cats = entry.get("categories") or [] static = {"geo": geo, "categories": cats} typ = entry.get("type") rows: List[Dict[str, Any]] = [] if typ == "grantsgov_api": raw_hits = _collect_from_grantsgov_api(entry) rows = [normalize("grants_gov", h, static) for h in raw_hits] elif typ == "local_sample": p = Path(entry["path"]).expanduser() blob = json.loads(p.read_text(encoding="utf-8")) items = blob.get("opportunities") or [] rows = [normalize("local_sample", op, static) for op in items] else: # Future adapters (doj_ojp, state_md, web_page, json_static, …) rows = [] print(f"[collect] {name} → {len(rows)} rows") all_rows.extend(rows) # ---- DEDUPE (id → url → title) ---- seen, unique = set(), [] for r in all_rows: key = r.get("id") or r.get("url") or r.get("title") if not key or key in seen: continue seen.add(key) unique.append(r) print(f"[ingest] Unique records to index: {len(unique)}") path = _save_docstore(unique) n = _build_index_from_docstore() return path, n if __name__ == "__main__": import argparse ap = argparse.ArgumentParser() ap.add_argument("--config", default="config/sources.yaml") args = ap.parse_args() p, n = ingest(args.config) print(f"Ingested {n} records. Docstore at {p}")