Spaces:
Paused
Paused
import os | |
import logging | |
from googleapiclient.discovery import build | |
from dotenv import load_dotenv | |
load_dotenv() | |
class GoogleSearch: | |
"""Performs Google Custom Search API queries.""" | |
def __init__(self): | |
self.api_key = os.getenv("GOOGLE_API_KEY") | |
self.cse_id = os.getenv("GOOGLE_CSE_ID") | |
self.service = None | |
if self.api_key and self.cse_id: | |
try: | |
self.service = build("customsearch", "v1", developerKey=self.api_key) | |
logging.info("✅ Google Custom Search initialized.") | |
except Exception as e: | |
logging.error(f"❌ CSE init failed: {e}") | |
else: | |
logging.warning("⚠️ GOOGLE_API_KEY or GOOGLE_CSE_ID not set; search disabled.") | |
def search(self, queries: list, num_results: int = 1) -> list: | |
if not self.service: | |
return [] | |
out = [] | |
for q in queries: | |
try: | |
resp = self.service.cse().list(q=q, cx=self.cse_id, num=num_results).execute() | |
items = resp.get("items", []) | |
formatted = [ | |
{"title": it.get("title"), "link": it.get("link"), "snippet": it.get("snippet")} | |
for it in items | |
] | |
out.append({"query": q, "results": formatted}) | |
except Exception as e: | |
logging.error(f"❌ Search error for '{q}': {e}") | |
out.append({"query": q, "results": []}) | |
return out | |
google_search = GoogleSearch() | |