Spaces:
Running
Running
# app/ingest.py | |
from __future__ import annotations | |
import json | |
from pathlib import Path | |
from typing import Dict, List, Any | |
import yaml | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from app.paths import DOCSTORE_DIR, INDEX_DIR | |
from .normalize import normalize # β central normalizer | |
# -------------------- Config -------------------- | |
def load_config(cfg_path: str) -> Dict: | |
with open(cfg_path, "r", encoding="utf-8") as f: | |
return yaml.safe_load(f) | |
# -------------------- Grants.gov collector -------------------- | |
def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: | |
""" | |
Calls the Grants.gov Search2 client and returns a list of RAW dicts | |
(adapter may already be close to unified; we'll still run normalize()). | |
""" | |
from app.sources.grantsgov_api import search_grants # local import to avoid cycles | |
api = src.get("api", {}) | |
page_size = int(api.get("page_size", src.get("page_size", 100))) | |
max_pages = int(api.get("max_pages", src.get("max_pages", 5))) | |
payload = api.get("payload", src.get("payload", {})) | |
url = src.get("url", "") | |
out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) | |
hits = out.get("hits", []) if isinstance(out, dict) else (out or []) | |
return [h for h in hits if isinstance(h, dict)] | |
# -------------------- Write docstore & build index -------------------- | |
def _save_docstore(recs: List[Dict[str, Any]]) -> str: | |
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) | |
path = DOCSTORE_DIR / "docstore.jsonl" | |
with path.open("w", encoding="utf-8") as f: | |
for r in recs: | |
f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
return str(path) | |
def _build_index_from_docstore() -> int: | |
ds_path = DOCSTORE_DIR / "docstore.jsonl" | |
if not ds_path.exists(): | |
raise RuntimeError("Docstore not found. Run ingest first.") | |
# Load records β texts + metas | |
texts: List[str] = [] | |
metas: List[Dict[str, Any]] = [] | |
with ds_path.open("r", encoding="utf-8") as f: | |
for line in f: | |
rec = json.loads(line) | |
title = rec.get("title") or "" | |
synopsis = rec.get("synopsis") or rec.get("summary") or "" | |
agency = rec.get("agency") or "" | |
eligibility = rec.get("eligibility") or "" | |
txt = "\n".join([title, synopsis, agency, eligibility]).strip() | |
if not txt: | |
continue | |
texts.append(txt) | |
metas.append({ | |
"id": rec.get("id"), | |
"title": title, | |
"url": rec.get("url"), | |
"source": rec.get("source"), | |
"geo": rec.get("geo"), | |
"categories": rec.get("categories"), | |
"agency": agency, | |
"deadline": rec.get("deadline"), | |
"program_number": rec.get("program_number"), | |
"posted_date": rec.get("posted_date"), | |
}) | |
print(f"[index] Rows loaded from docstore: {len(texts)}") | |
if not texts: | |
# Write an empty index file so downstream UI can still boot gracefully | |
(INDEX_DIR).mkdir(parents=True, exist_ok=True) | |
(INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) | |
print("[index] No texts to embed. Wrote empty meta.json.") | |
return 0 | |
# Embed (CPU default; keeps it portable) | |
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
model.max_seq_length = 256 | |
batch = max(8, min(32, len(texts))) # sensible batch size for small corpora | |
emb = model.encode( | |
texts, | |
convert_to_numpy=True, | |
normalize_embeddings=True, | |
show_progress_bar=True, | |
batch_size=batch, | |
).astype(np.float32, copy=False) | |
# FAISS index (Inner Product for cosine on normalized vectors) | |
import faiss | |
dim = emb.shape[1] | |
index = faiss.IndexFlatIP(dim) | |
index.add(emb) | |
INDEX_DIR.mkdir(parents=True, exist_ok=True) | |
faiss.write_index(index, str(INDEX_DIR / "faiss.index")) | |
(INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) | |
print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") | |
return len(texts) | |
# -------------------- Ingest main -------------------- | |
def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): | |
""" | |
Reads config, fetches from enabled sources, normalizes with a single map, | |
attaches categories/geo consistently, DEDUPEs, and builds the index. | |
""" | |
cfg = load_config(cfg_path) | |
all_rows: List[Dict[str, Any]] = [] | |
for entry in cfg.get("sources", []): | |
if not entry.get("enabled"): | |
continue | |
name = entry.get("name", "<source>") | |
geo = entry.get("geo") or "US" | |
cats = entry.get("categories") or [] | |
static = {"geo": geo, "categories": cats} | |
typ = entry.get("type") | |
rows: List[Dict[str, Any]] = [] | |
if typ == "grantsgov_api": | |
raw_hits = _collect_from_grantsgov_api(entry) | |
rows = [normalize("grants_gov", h, static) for h in raw_hits] | |
elif typ == "local_sample": | |
p = Path(entry["path"]).expanduser() | |
blob = json.loads(p.read_text(encoding="utf-8")) | |
items = blob.get("opportunities") or [] | |
rows = [normalize("local_sample", op, static) for op in items] | |
else: | |
# Future adapters (doj_ojp, state_md, web_page, json_static, β¦) | |
rows = [] | |
print(f"[collect] {name} β {len(rows)} rows") | |
all_rows.extend(rows) | |
# ---- DEDUPE (id β url β title) ---- | |
seen, unique = set(), [] | |
for r in all_rows: | |
key = r.get("id") or r.get("url") or r.get("title") | |
if not key or key in seen: | |
continue | |
seen.add(key) | |
unique.append(r) | |
print(f"[ingest] Unique records to index: {len(unique)}") | |
path = _save_docstore(unique) | |
n = _build_index_from_docstore() | |
return path, n | |
if __name__ == "__main__": | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("--config", default="config/sources.yaml") | |
args = ap.parse_args() | |
p, n = ingest(args.config) | |
print(f"Ingested {n} records. Docstore at {p}") | |