# app/sources/grantsgov_api.py from __future__ import annotations from typing import Dict, List, Any, Optional from datetime import datetime import requests # Official Grants.gov Search2 endpoint (JSON POST) API_URL = "https://api.grants.gov/v1/api/search2" def _coerce_pipe(v: Any) -> str: """Accept list/tuple/set/str/None and return pipe-delimited string.""" if v is None: return "" if isinstance(v, (list, tuple, set)): return "|".join([str(x) for x in v if x]) return str(v) def _first(x: Any) -> Optional[str]: if isinstance(x, (list, tuple)) and x: return str(x[0]) return str(x) if x is not None else None def _parse_date(d: Any) -> Optional[str]: """Return YYYY-MM-DD or None (be tolerant to formats).""" if not d: return None s = str(d) # common formats seen in the API for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"): try: return datetime.strptime(s, fmt).date().isoformat() except Exception: pass try: return datetime.fromisoformat(s).date().isoformat() except Exception: return None # Map common config keys → API keys so older configs still work _KEY_MAP = { "opportunityStatuses": "oppStatuses", "agencyCodes": "agencies", "agencies": "agencies", "alns": "aln", } def _remap_payload_keys(payload: Dict[str, Any]) -> Dict[str, Any]: out = dict(payload or {}) for k, v in list(out.items()): if k in _KEY_MAP: out[_KEY_MAP[k]] = v return out def search_grants( _unused_url: str, payload: Dict[str, Any], page_size: int = 100, max_pages: int = 10, timeout: int = 30, ) -> Dict[str, Any]: """ Calls Grants.gov Search2 API with pagination and returns normalized results: Returns: { "hits": [ { unified schema per record }, ... ], "hitCount": int } """ all_hits: List[Dict[str, Any]] = [] start = 0 pages = 0 hit_count: Optional[int] = None # Bridge payload keys and coerce to API expectations payload = _remap_payload_keys(payload or {}) keyword = payload.get("keyword", "") or payload.get("keywords", "") oppNum = payload.get("oppNum", "") eligibilities = _coerce_pipe(payload.get("eligibilities", "")) agencies = _coerce_pipe(payload.get("agencies", "")) oppStatuses = _coerce_pipe(payload.get("oppStatuses", "")) or "forecasted|posted" aln = _coerce_pipe(payload.get("aln", "")) fundingCategories = _coerce_pipe(payload.get("fundingCategories", "")) session = requests.Session() headers = {"Content-Type": "application/json"} while pages < max_pages: req_body = { "rows": page_size, "startRecordNum": start, # pagination "keyword": keyword, "oppNum": oppNum, "eligibilities": eligibilities, "agencies": agencies, "oppStatuses": oppStatuses, "aln": aln, "fundingCategories": fundingCategories, } resp = session.post(API_URL, json=req_body, headers=headers, timeout=timeout) resp.raise_for_status() j = resp.json() or {} data = j.get("data") or {} if hit_count is None: try: hit_count = int(data.get("hitCount", 0)) except Exception: hit_count = 0 opp_hits = data.get("oppHits") or [] if not opp_hits: break # ---- Normalize each record to unified schema ---- for h in opp_hits: gg_id = h.get("id") num = h.get("number") aln_list = h.get("alnist", []) or [] norm = { # unified schema (stable id avoids duplicates across configs) "id": f"gg:{num or gg_id}", "source": "grants.gov", "title": h.get("title"), "agency": h.get("agencyName") or h.get("agencyCode"), "program_number": _first(aln_list), # Assistance Listing (ALN/CFDA) "posted_date": _parse_date(h.get("openDate")), "deadline": _parse_date(h.get("closeDate")), "synopsis": h.get("synopsis") or h.get("summary"), "location_scope": ["US"], # Grants.gov is US-wide by default "tags": [], # to be extended by ingest with config categories "url": f"https://www.grants.gov/search-results-detail/{gg_id}" if gg_id else None, "raw": h, # keep full source blob for traceability } # Optional award fields if present (keep None if absent) if "awardFloor" in h: norm["award_floor"] = h.get("awardFloor") if "awardCeiling" in h: norm["award_ceiling"] = h.get("awardCeiling") if "expectedNumberOfAwards" in h: norm["expected_awards"] = h.get("expectedNumberOfAwards") if "eligibility" in h: norm["eligibility"] = h.get("eligibility") all_hits.append(norm) got = len(opp_hits) start += got pages += 1 if hit_count is not None and start >= hit_count: break return {"hits": all_hits, "hitCount": hit_count or 0}