# genesis/funding.py """ Global Funding Intelligence for GENESIS-AI Fetches worldwide biotech & synthetic biology grant opportunities. """ import requests import logging logging.basicConfig(level=logging.INFO) NIH_FUNDING_API = "https://api.reporter.nih.gov/v2/projects/search" EU_CORDIS_API = "https://cordis.europa.eu/api/project" # Horizon Europe WELLCOME_TRUST_API = "https://api.wellcome.org/grants" # Example placeholder # ----------------------- # NIH Funding # ----------------------- def get_nih_funding(query="synthetic biology", limit=10): """Fetch NIH funding opportunities.""" payload = {"criteria": {"textCriteria": {"searchText": query}}, "offset": 0, "limit": limit} try: r = requests.post(NIH_FUNDING_API, json=payload, timeout=30) r.raise_for_status() return r.json() except Exception as e: logging.error(f"[Funding] NIH API error: {e}") return {"error": str(e)} # ----------------------- # EU Horizon Europe / CORDIS Funding # ----------------------- def get_eu_funding(query="synthetic biology", limit=10): """Fetch Horizon Europe / EU CORDIS funding calls.""" try: r = requests.get(f"{EU_CORDIS_API}?q={query}&num={limit}", timeout=30) r.raise_for_status() return r.json() except Exception as e: logging.error(f"[Funding] EU CORDIS API error: {e}") return {"error": str(e)} # ----------------------- # Wellcome Trust Grants # ----------------------- def get_wellcome_funding(query="synthetic biology"): """Fetch Wellcome Trust grants.""" try: r = requests.get(f"{WELLCOME_TRUST_API}?query={query}", timeout=30) r.raise_for_status() return r.json() except Exception as e: logging.error(f"[Funding] Wellcome Trust API error: {e}") return {"error": str(e)} # ----------------------- # Aggregator # ----------------------- def get_global_funding(query="synthetic biology"): """Aggregate funding from all major sources.""" return { "NIH": get_nih_funding(query), "EU_CORDIS": get_eu_funding(query), "Wellcome_Trust": get_wellcome_funding(query) } # ----------------------- # Pipeline-Compatible Function # ----------------------- def fetch_funding_data(query, max_results=10): """ Pipeline-friendly function that returns a list of funding entries. Falls back to mock data if APIs fail. """ logging.info(f"[Funding] Fetching global funding data for: {query}") try: results = [] nih_data = get_nih_funding(query, limit=max_results) if isinstance(nih_data, dict) and "projects" in nih_data: for proj in nih_data.get("projects", []): results.append({ "source": "NIH", "title": proj.get("projectTitle", "No title"), "amount": proj.get("awardAmount", "Unknown"), "date": proj.get("projectStartDate", ""), "link": f"https://reporter.nih.gov/project-details/{proj.get('projectNumber', '')}" }) eu_data = get_eu_funding(query, limit=max_results) if isinstance(eu_data, dict) and "projects" in eu_data: for proj in eu_data.get("projects", []): results.append({ "source": "EU CORDIS", "title": proj.get("title", "No title"), "amount": proj.get("totalCost", "Unknown"), "date": proj.get("startDate", ""), "link": proj.get("rcn", "") }) wellcome_data = get_wellcome_funding(query) if isinstance(wellcome_data, dict) and "grants" in wellcome_data: for grant in wellcome_data.get("grants", []): results.append({ "source": "Wellcome Trust", "title": grant.get("title", "No title"), "amount": grant.get("amountAwarded", "Unknown"), "date": grant.get("awardDate", ""), "link": grant.get("url", "") }) # If no real data found, return mock data if not results: logging.warning("[Funding] No live API results — returning mock data.") results = [ { "company": "Example Biotech Inc.", "amount": "$5M", "date": "2024-05-10", "source": "Mock Funding API", "link": "https://example.com/funding/123" }, { "company": "Synthetic Bio Solutions", "amount": "$2.5M", "date": "2023-12-20", "source": "Mock Funding API", "link": "https://example.com/funding/456" } ] return results except Exception as e: logging.error(f"[Funding] fetch_funding_data failed: {e}") return []