Spaces:
Running
Running
from typing import Any, Dict, Callable, Optional | |
from datetime import datetime | |
def _iso(d: Any) -> Optional[str]: | |
if not d: | |
return None | |
s = str(d) | |
for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"): | |
try: | |
return datetime.strptime(s, fmt).date().isoformat() | |
except Exception: | |
pass | |
try: | |
return datetime.fromisoformat(s).date().isoformat() | |
except Exception: | |
return None | |
def _first(x: Any) -> Any: | |
return (x[0] if isinstance(x, (list, tuple)) and x else x) | |
def _list(x: Any) -> list: | |
if x is None: | |
return [] | |
if isinstance(x, list): | |
return x | |
if isinstance(x, (set, tuple)): | |
return list(x) | |
return [x] | |
# Registry of source mappers: raw -> unified schema | |
MAPPERS: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = {} | |
def mapper(name: str): | |
def _wrap(fn: Callable[[Dict[str, Any]], Dict[str, Any]]): | |
MAPPERS[name] = fn | |
return fn | |
return _wrap | |
def _map_grants_gov(h: Dict[str, Any]) -> Dict[str, Any]: | |
gg_id = h.get("id") | |
num = h.get("number") | |
aln_list = h.get("alnist") or h.get("aln") or [] | |
out: Dict[str, Any] = { | |
"id": f"gg:{num or gg_id}", | |
"source": "grants.gov", | |
"title": h.get("title"), | |
"agency": h.get("agencyName") or h.get("agencyCode") or h.get("agency"), | |
"program_number": _first(aln_list) or h.get("program_number"), | |
"posted_date": _iso(h.get("openDate") or h.get("posted_date")), | |
"deadline": _iso(h.get("closeDate") or h.get("deadline")), | |
"synopsis": h.get("synopsis") or h.get("summary"), | |
"location_scope": h.get("location_scope") or ["US"], | |
"tags": h.get("tags") or [], | |
"url": h.get("url") or (f"https://www.grants.gov/search-results-detail/{gg_id}" if gg_id else None), | |
"raw": h, | |
} | |
# Optionals if present on the raw record | |
for k_src, k_dst in [ | |
("awardFloor", "award_floor"), | |
("awardCeiling", "award_ceiling"), | |
("expectedNumberOfAwards", "expected_awards"), | |
("eligibility", "eligibility"), | |
]: | |
if h.get(k_src) is not None or h.get(k_dst) is not None: | |
out[k_dst] = h.get(k_dst) if h.get(k_dst) is not None else h.get(k_src) | |
return out | |
def _map_local_sample(op: Dict[str, Any]) -> Dict[str, Any]: | |
return { | |
"id": f"sample:{op.get('opportunityNumber')}", | |
"source": "sample_local", | |
"title": op.get("opportunityTitle"), | |
"agency": op.get("agency"), | |
"program_number": None, | |
"posted_date": _iso(op.get("postedDate")), | |
"deadline": _iso(op.get("closeDate")), | |
"synopsis": op.get("synopsis"), | |
"location_scope": ["US"], | |
"tags": [], | |
"url": None, | |
"raw": op, | |
} | |
def normalize(source_key: str, raw: Dict[str, Any], static: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
if source_key not in MAPPERS: | |
raise KeyError("No mapper registered for %r" % source_key) | |
rec = MAPPERS[source_key](raw) | |
static = static or {} | |
# attach geo | |
if static.get("geo"): | |
rec["geo"] = static["geo"] | |
# attach categories and mirror into tags | |
cats = _list(static.get("categories")) | |
rec.setdefault("categories", []) | |
for c in cats: | |
if c not in rec["categories"]: | |
rec["categories"].append(c) | |
rec["tags"] = list(set(_list(rec.get("tags")) + cats)) | |
return rec | |