grants-rag / app /ingest.py
michaellupo74's picture
Update UI: add ministry hidden-count toggle; refresh Makefile, ingest.py and sources.yaml
b363844
# app/ingest.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, List, Any
import yaml
import numpy as np
from sentence_transformers import SentenceTransformer
from app.paths import DOCSTORE_DIR, INDEX_DIR
from .normalize import normalize # ← central normalizer
# -------------------- Config --------------------
def load_config(cfg_path: str) -> Dict:
with open(cfg_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# -------------------- Grants.gov collector --------------------
def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]:
"""
Calls the Grants.gov Search2 client and returns a list of RAW dicts
(adapter may already be close to unified; we'll still run normalize()).
"""
from app.sources.grantsgov_api import search_grants # local import to avoid cycles
api = src.get("api", {})
page_size = int(api.get("page_size", src.get("page_size", 100)))
max_pages = int(api.get("max_pages", src.get("max_pages", 5)))
payload = api.get("payload", src.get("payload", {}))
url = src.get("url", "")
out = search_grants(url, payload, page_size=page_size, max_pages=max_pages)
hits = out.get("hits", []) if isinstance(out, dict) else (out or [])
return [h for h in hits if isinstance(h, dict)]
# -------------------- Write docstore & build index --------------------
def _save_docstore(recs: List[Dict[str, Any]]) -> str:
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True)
path = DOCSTORE_DIR / "docstore.jsonl"
with path.open("w", encoding="utf-8") as f:
for r in recs:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
return str(path)
def _build_index_from_docstore() -> int:
ds_path = DOCSTORE_DIR / "docstore.jsonl"
if not ds_path.exists():
raise RuntimeError("Docstore not found. Run ingest first.")
# Load records β†’ texts + metas
texts: List[str] = []
metas: List[Dict[str, Any]] = []
with ds_path.open("r", encoding="utf-8") as f:
for line in f:
rec = json.loads(line)
title = rec.get("title") or ""
synopsis = rec.get("synopsis") or rec.get("summary") or ""
agency = rec.get("agency") or ""
eligibility = rec.get("eligibility") or ""
txt = "\n".join([title, synopsis, agency, eligibility]).strip()
if not txt:
continue
texts.append(txt)
metas.append({
"id": rec.get("id"),
"title": title,
"url": rec.get("url"),
"source": rec.get("source"),
"geo": rec.get("geo"),
"categories": rec.get("categories"),
"agency": agency,
"deadline": rec.get("deadline"),
"program_number": rec.get("program_number"),
"posted_date": rec.get("posted_date"),
})
print(f"[index] Rows loaded from docstore: {len(texts)}")
if not texts:
# Write an empty index file so downstream UI can still boot gracefully
(INDEX_DIR).mkdir(parents=True, exist_ok=True)
(INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False))
print("[index] No texts to embed. Wrote empty meta.json.")
return 0
# Embed (CPU default; keeps it portable)
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
model.max_seq_length = 256
batch = max(8, min(32, len(texts))) # sensible batch size for small corpora
emb = model.encode(
texts,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=True,
batch_size=batch,
).astype(np.float32, copy=False)
# FAISS index (Inner Product for cosine on normalized vectors)
import faiss
dim = emb.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(emb)
INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(INDEX_DIR / "faiss.index"))
(INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False))
print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).")
return len(texts)
# -------------------- Ingest main --------------------
def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None):
"""
Reads config, fetches from enabled sources, normalizes with a single map,
attaches categories/geo consistently, DEDUPEs, and builds the index.
"""
cfg = load_config(cfg_path)
all_rows: List[Dict[str, Any]] = []
for entry in cfg.get("sources", []):
if not entry.get("enabled"):
continue
name = entry.get("name", "<source>")
geo = entry.get("geo") or "US"
cats = entry.get("categories") or []
static = {"geo": geo, "categories": cats}
typ = entry.get("type")
rows: List[Dict[str, Any]] = []
if typ == "grantsgov_api":
raw_hits = _collect_from_grantsgov_api(entry)
rows = [normalize("grants_gov", h, static) for h in raw_hits]
elif typ == "local_sample":
p = Path(entry["path"]).expanduser()
blob = json.loads(p.read_text(encoding="utf-8"))
items = blob.get("opportunities") or []
rows = [normalize("local_sample", op, static) for op in items]
else:
# Future adapters (doj_ojp, state_md, web_page, json_static, …)
rows = []
print(f"[collect] {name} β†’ {len(rows)} rows")
all_rows.extend(rows)
# ---- DEDUPE (id β†’ url β†’ title) ----
seen, unique = set(), []
for r in all_rows:
key = r.get("id") or r.get("url") or r.get("title")
if not key or key in seen:
continue
seen.add(key)
unique.append(r)
print(f"[ingest] Unique records to index: {len(unique)}")
path = _save_docstore(unique)
n = _build_index_from_docstore()
return path, n
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--config", default="config/sources.yaml")
args = ap.parse_args()
p, n = ingest(args.config)
print(f"Ingested {n} records. Docstore at {p}")