Spaces:
Building
Building
# app/ingest.py | |
from __future__ import annotations | |
import json | |
from pathlib import Path | |
from typing import Dict, List, Any, Tuple, Optional | |
import yaml | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from app.paths import DOCSTORE_DIR, INDEX_DIR | |
from .normalize import normalize # β central normalizer | |
import re | |
import time | |
import hashlib | |
import requests | |
from bs4 import BeautifulSoup | |
from datetime import datetime, timezone | |
# -------------------- Config -------------------- | |
def load_config(cfg_path: str) -> Dict: | |
with open(cfg_path, "r", encoding="utf-8") as f: | |
return yaml.safe_load(f) | |
# -------------------- Capacity / Geo Filters (config-driven) -------------------- | |
# controls live in config/sources.yaml: | |
# filters: | |
# capacity_only: true | |
# pa_md_only: false | |
_INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ | |
r"\bcapacity(?:[-\s]?building)?\b", | |
r"\btechnical\s+assistance\b", | |
r"\bTA\b", | |
r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b", | |
r"\borganization(?:al)?\s+infrastructure\b", | |
r"\bback[-\s]?office\b|\bbackbone\s+organization\b", | |
r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b", | |
r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b", | |
r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b", | |
r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b", | |
r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b", | |
r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b", | |
# broaden for transportation / human services | |
r"\bprovider\s+capacity\b|\bservice\s+capacity\b|\borganizational\s+support\b", | |
]] | |
_EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ | |
r"\bteaching\s+assistant\b|\bTAs\b", | |
r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)", | |
r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)", | |
r"\b(postsecondary|university|college)\b(?!.*community\s+partner)", | |
r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))", | |
]] | |
_PA_MD_HINTS = re.compile( | |
r"\b(" | |
r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|" | |
r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b" | |
r")\b", | |
re.I, | |
) | |
def _doc_text_from_row(rec: Dict[str, Any]) -> str: | |
title = rec.get("title") or "" | |
synopsis = rec.get("synopsis") or rec.get("summary") or "" | |
agency = rec.get("agency") or "" | |
eligibility = rec.get("eligibility") or "" | |
categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "") | |
geo = rec.get("geo") or "" | |
return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip() | |
def _is_capacity_building_text(text: str) -> bool: | |
if not text: | |
return False | |
if any(p.search(text) for p in _EXCLUDE_PATTERNS): | |
return False | |
return any(p.search(text) for p in _INCLUDE_PATTERNS) | |
def _is_pa_md_text(text: str) -> bool: | |
if not text: | |
return False | |
return bool(_PA_MD_HINTS.search(text)) | |
# -------------------- Deadline parsing helpers -------------------- | |
# Common date shapes: "October 15, 2025", "Oct 15, 2025", "10/15/2025", "2025-10-15" | |
_MONTHS = { | |
"january":1, "jan":1, "february":2, "feb":2, "march":3, "mar":3, "april":4, "apr":4, | |
"may":5, "june":6, "jun":6, "july":7, "jul":7, "august":8, "aug":8, "september":9, "sep":9, "sept":9, | |
"october":10, "oct":10, "november":11, "nov":11, "december":12, "dec":12 | |
} | |
# Capture label + date snippets | |
_DEADLINE_LINES = re.compile( | |
r"(deadline|applications?\s+due|closing\s+date|due\s+date|submission\s+deadline)\s*[:\-]?\s*(.+)", | |
re.I | |
) | |
# Capture absolute dates | |
_DATE_ISO = re.compile(r"\b(20\d{2})[-/\.](\d{1,2})[-/\.](\d{1,2})\b") # 2025-10-15 or 2025/10/15 | |
_DATE_US = re.compile(r"\b(\d{1,2})[-/\.](\d{1,2})[-/\.](20\d{2})\b") # 10/15/2025 | |
_DATE_LONG = re.compile( | |
r"\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?\,?\s+(20\d{2})\b", re.I) # Oct 15, 2025 / October 15 2025 | |
def _to_iso(y:int, m:int, d:int) -> Optional[str]: | |
try: | |
return datetime(y, m, d, tzinfo=timezone.utc).date().isoformat() | |
except Exception: | |
return None | |
def _parse_date_fragment(s: str) -> Optional[str]: | |
s = (s or "").strip() | |
# ISO first | |
m = _DATE_ISO.search(s) | |
if m: | |
y, mm, dd = int(m.group(1)), int(m.group(2)), int(m.group(3)) | |
return _to_iso(y, mm, dd) | |
# US 10/15/2025 | |
m = _DATE_US.search(s) | |
if m: | |
mm, dd, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) | |
return _to_iso(y, mm, dd) | |
# Long months | |
m = _DATE_LONG.search(s) | |
if m: | |
mon = m.group(1).lower() | |
dd = int(m.group(2)) | |
y = int(m.group(3)) | |
mm = _MONTHS.get(mon) | |
if mm: | |
return _to_iso(y, mm, dd) | |
return None | |
def _extract_deadline(text: str) -> Tuple[Optional[str], Optional[str]]: | |
""" | |
Try to locate a deadline-like line, then parse a date. | |
Returns (deadline_iso, raw_snippet) or (None, None). | |
""" | |
if not text: | |
return None, None | |
# Look for labeled lines first (strong signal) | |
for line in text.splitlines(): | |
m = _DEADLINE_LINES.search(line) | |
if m: | |
iso = _parse_date_fragment(m.group(2)) | |
if iso: | |
return iso, line.strip() | |
# Fallback: scan whole text for any date (first match) | |
iso = _parse_date_fragment(text) | |
if iso: | |
return iso, None | |
return None, None | |
def _compute_is_active(deadline_iso: Optional[str]) -> bool: | |
if not deadline_iso: | |
return True # treat unknown/TBD as active | |
try: | |
return datetime.fromisoformat(deadline_iso).date() >= datetime.utcnow().date() | |
except Exception: | |
return True | |
# -------------------- Grants.gov collector -------------------- | |
def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: | |
""" | |
Calls the Grants.gov Search2 client and returns a list of RAW dicts | |
(adapter may already be close to unified; we'll still run normalize()). | |
""" | |
from app.sources.grantsgov_api import search_grants # local import to avoid cycles | |
api = src.get("api", {}) | |
page_size = int(api.get("page_size", src.get("page_size", 100))) | |
max_pages = int(api.get("max_pages", src.get("max_pages", 5))) | |
payload = api.get("payload", src.get("payload", {})) | |
url = src.get("url", "") | |
out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) | |
hits = out.get("hits", []) if isinstance(out, dict) else (out or []) | |
return [h for h in hits if isinstance(h, dict)] | |
# -------------------- NEW: Generic HTML / PDF collectors -------------------- | |
_HTTP_HEADERS = { | |
"User-Agent": "grants-rag/1.0 (+https://example.local) requests", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
} | |
def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]: | |
try: | |
r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout) | |
if r.status_code == 200 and r.content: | |
return r | |
except requests.RequestException: | |
return None | |
return None | |
def _soup(html: str) -> BeautifulSoup: | |
# use lxml or html5lib if available for robustness | |
return BeautifulSoup(html, "lxml") | |
def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]: | |
""" | |
Returns (title, text). Uses selectors if provided; | |
falls back to common content containers. | |
""" | |
title = s.title.string.strip() if s.title and s.title.string else "" | |
nodes = [] | |
if selectors: | |
for css in selectors: | |
nodes.extend(s.select(css) or []) | |
if not nodes: | |
for css in ("main", "article", "#content", ".content", "[role='main']"): | |
nodes.extend(s.select(css) or []) | |
if not nodes: | |
nodes = [s.body] if s.body else [] | |
parts: List[str] = [] | |
for n in nodes: | |
if not n: | |
continue | |
txt = n.get_text(separator="\n", strip=True) | |
if txt: | |
parts.append(txt) | |
body = "\n\n".join(parts).strip() | |
return title, body | |
def _make_id(*fields: str) -> str: | |
h = hashlib.sha1() | |
for f in fields: | |
if f: | |
h.update(f.encode("utf-8", "ignore")) | |
h.update(b"|") | |
return h.hexdigest() | |
def _normalize_web_record( | |
source_name: str, | |
url: str, | |
title: str, | |
body: str, | |
static: Dict[str, Any], | |
extra: Optional[Dict[str, Any]] = None, | |
) -> Dict[str, Any]: | |
""" | |
Produce a record shaped like normalize() output so downstream stays unchanged. | |
Adds parsed deadline + is_active when available. | |
""" | |
extra = extra or {} | |
# If caller didn't pass a deadline, try to parse from body/title here. | |
deadline_iso = extra.get("deadline") | |
deadline_text = extra.get("deadline_text") | |
if not deadline_iso: | |
# Try parsing from body first, then from title. | |
deadline_iso, deadline_text = _extract_deadline(body) or (None, None) | |
if not deadline_iso and title: | |
deadline_iso, _ = _extract_deadline(title) or (None, None) | |
rec = { | |
"id": extra.get("id") or _make_id(url, title or body[:160]), | |
"title": title or extra.get("title") or url, | |
"synopsis": (body or "")[:2000], # clip; embeddings use title+synopsis later | |
"summary": None, | |
"url": url, | |
"source": source_name, | |
"geo": static.get("geo"), | |
"categories": static.get("categories"), | |
"agency": extra.get("agency", ""), | |
"eligibility": extra.get("eligibility", ""), | |
# Store ISO (YYYY-MM-DD) in 'deadline' for consistency | |
"deadline": deadline_iso, | |
"deadline_text": deadline_text, # keep the raw line we matched (if any) | |
"program_number": extra.get("program_number"), | |
"posted_date": extra.get("posted_date"), | |
} | |
rec["is_active"] = _compute_is_active(rec["deadline"]) | |
return rec | |
def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: | |
""" | |
Supports types: 'web_page' and 'http_html' | |
Config keys supported: | |
- url (str) | |
- parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] } | |
- crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing | |
""" | |
url = entry.get("url") | |
if not url: | |
return [] | |
r = _http_get(url) | |
if not r: | |
return [] | |
s = _soup(r.text) | |
parse = entry.get("parse", {}) or entry.get("extract", {}) or {} | |
content_selectors = parse.get("content_selectors") or [] | |
title, body = _text_from_soup(s, content_selectors) | |
rows = [] | |
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) | |
# follow links? | |
follow = bool(parse.get("follow_links")) | |
link_selectors = parse.get("link_selectors") or [] | |
crawl = entry.get("crawl", {}) or {} | |
max_depth = int(crawl.get("max_depth", 0) or 0) | |
visited = set([url]) | |
def _enq_links(soup: BeautifulSoup) -> List[str]: | |
if link_selectors: | |
links = [] | |
for sel in link_selectors: | |
for a in soup.select(sel) or []: | |
href = a.get("href") | |
if href and href.startswith("http"): | |
links.append(href) | |
out, seen = [], set() | |
for h in links: | |
if h not in seen: | |
out.append(h) | |
seen.add(h) | |
return out[:40] # polite cap | |
return [] | |
if follow and max_depth > 0: | |
frontier = _enq_links(s) | |
depth = 1 | |
while frontier and depth <= max_depth and len(rows) < 200: | |
next_frontier = [] | |
for link in frontier: | |
if link in visited: | |
continue | |
visited.add(link) | |
rr = _http_get(link) | |
if not rr: | |
continue | |
ss = _soup(rr.text) | |
t2, b2 = _text_from_soup(ss, content_selectors) | |
if b2: | |
rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None})) | |
if depth < max_depth: | |
next_frontier.extend(_enq_links(ss)) | |
time.sleep(0.1) # gentle | |
frontier = next_frontier | |
depth += 1 | |
return rows | |
def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: | |
""" | |
type: 'http_pdf' | |
keys: | |
- url (single PDF fetch) | |
""" | |
url = entry.get("url") | |
if not url: | |
return [] | |
try: | |
from pdfminer.high_level import extract_text # lazy import | |
except Exception: | |
return [] | |
rows = [] | |
r = _http_get(url, timeout=40) | |
if not r: | |
return rows | |
tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf") | |
try: | |
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) | |
tmp.write_bytes(r.content) | |
body = extract_text(str(tmp)) or "" | |
finally: | |
try: | |
tmp.unlink(missing_ok=True) | |
except Exception: | |
pass | |
title = entry.get("name") or "PDF Document" | |
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) | |
return rows | |
# -------------------- Write docstore & build index -------------------- | |
def _save_docstore(recs: List[Dict[str, Any]]) -> str: | |
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) | |
path = DOCSTORE_DIR / "docstore.jsonl" | |
with path.open("w", encoding="utf-8") as f: | |
for r in recs: | |
f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
return str(path) | |
def _build_index_from_docstore() -> int: | |
ds_path = DOCSTORE_DIR / "docstore.jsonl" | |
if not ds_path.exists(): | |
raise RuntimeError("Docstore not found. Run ingest first.") | |
texts: List[str] = [] | |
metas: List[Dict[str, Any]] = [] | |
with ds_path.open("r", encoding="utf-8") as f: | |
for line in f: | |
rec = json.loads(line) | |
title = rec.get("title") or "" | |
synopsis = rec.get("synopsis") or rec.get("summary") or "" | |
agency = rec.get("agency") or "" | |
eligibility = rec.get("eligibility") or "" | |
txt = "\n".join([title, synopsis, agency, eligibility]).strip() | |
if not txt: | |
continue | |
texts.append(txt) | |
metas.append({ | |
"id": rec.get("id"), | |
"title": title, | |
"url": rec.get("url"), | |
"source": rec.get("source"), | |
"geo": rec.get("geo"), | |
"categories": rec.get("categories"), | |
"agency": agency, | |
"deadline": rec.get("deadline"), # ISO if available | |
"deadline_text": rec.get("deadline_text"), | |
"is_active": rec.get("is_active"), | |
"program_number": rec.get("program_number"), | |
"posted_date": rec.get("posted_date"), | |
}) | |
print(f"[index] Rows loaded from docstore: {len(texts)}") | |
if not texts: | |
INDEX_DIR.mkdir(parents=True, exist_ok=True) | |
(INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) | |
print("[index] No texts to embed. Wrote empty meta.json.") | |
return 0 | |
# Embed (CPU default; portable) | |
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
model.max_seq_length = 256 | |
batch = max(8, min(32, len(texts))) | |
emb = model.encode( | |
texts, | |
convert_to_numpy=True, | |
normalize_embeddings=True, | |
show_progress_bar=True, | |
batch_size=batch, | |
).astype(np.float32, copy=False) | |
# FAISS index (Inner Product for cosine on normalized vectors) | |
import faiss | |
dim = emb.shape[1] | |
index = faiss.IndexFlatIP(dim) | |
index.add(emb) | |
INDEX_DIR.mkdir(parents=True, exist_ok=True) | |
faiss.write_index(index, str(INDEX_DIR / "faiss.index")) | |
(INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) | |
print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") | |
return len(texts) | |
# -------------------- Public API: ingest -------------------- | |
__all__ = ["ingest"] | |
def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): | |
""" | |
Reads config, fetches from enabled sources via adapters, normalizes to a single schema, | |
applies filters (capacity / PA-MD), dedupes, writes docstore, and builds the FAISS index. | |
Returns (docstore_path, n_indexed). | |
""" | |
cfg = load_config(cfg_path) | |
# ---- Filters from config ---- | |
f_cfg = (cfg or {}).get("filters", {}) or {} | |
capacity_only = bool(f_cfg.get("capacity_only", True)) | |
pa_md_only = bool(f_cfg.get("pa_md_only", False)) | |
print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}") | |
print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}") | |
all_rows: List[Dict[str, Any]] = [] | |
for entry in cfg.get("sources", []): | |
if not entry.get("enabled"): | |
continue | |
name = entry.get("name", "<source>") | |
geo = entry.get("geo") or "US" | |
cats = entry.get("categories") or [] | |
static = {"geo": geo, "categories": cats} | |
typ = entry.get("type") | |
rows: List[Dict[str, Any]] = [] | |
if typ == "grantsgov_api": | |
raw_hits = _collect_from_grantsgov_api(entry) | |
rows = [normalize("grants_gov", h, static) for h in raw_hits] | |
elif typ in ("web_page", "http_html"): | |
rows = _collect_from_http_html(entry, name, static) | |
elif typ == "http_pdf": | |
rows = _collect_from_http_pdf(entry, name, static) | |
elif typ == "local_sample": | |
p = Path(entry["path"]).expanduser() | |
blob = json.loads(p.read_text(encoding="utf-8")) | |
items = blob.get("opportunities") or [] | |
rows = [normalize("local_sample", op, static) for op in items] | |
# ---- Apply capacity / geo filters BEFORE collecting ---- | |
if rows and (capacity_only or pa_md_only): | |
filtered = [] | |
for r in rows: | |
t = _doc_text_from_row(r) | |
if capacity_only and not _is_capacity_building_text(t): | |
continue | |
if pa_md_only and not _is_pa_md_text(t): | |
continue | |
filtered.append(r) | |
print(f"[filter] {name}: kept {len(filtered)}/{len(rows)} after filters") | |
rows = filtered | |
print(f"[collect] {name} β {len(rows)} rows") | |
all_rows.extend(rows) | |
# ---- DEDUPE (by id β url β title) ---- | |
seen, unique = set(), [] | |
for r in all_rows: | |
key = r.get("id") or r.get("url") or r.get("title") | |
if not key or key in seen: | |
continue | |
seen.add(key) | |
unique.append(r) | |
print(f"[ingest] Unique records to index: {len(unique)}") | |
path = _save_docstore(unique) | |
n = _build_index_from_docstore() | |
return path, n | |
# -------------------- CLI -------------------- | |
if __name__ == "__main__": | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("--config", default="config/sources.yaml") | |
args = ap.parse_args() | |
p, n = ingest(args.config) | |
print(f"Ingested {n} records. Docstore at {p}") | |