Spaces:
Sleeping
Sleeping
Essi
fix: update YouTube transcript retrieval method and clean up normalization logic in GAIAAgent
98f5036
import ast | |
import json | |
import operator | |
import os | |
import re | |
from functools import lru_cache | |
from io import BytesIO | |
from typing import TypedDict | |
from urllib import parse | |
import gradio as gr | |
import pandas as pd | |
import requests | |
from langchain.agents import tool | |
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper | |
from langchain_core.messages import HumanMessage, SystemMessage | |
from langchain_openai import ChatOpenAI | |
from langgraph.graph import END, StateGraph | |
from wikipedia import summary as wiki_summary | |
from youtube_transcript_api import YouTubeTranscriptApi | |
# --- Constants --- | |
DEFAULT_API_URL: str = "https://agents-course-unit4-scoring.hf.space" | |
OPENAI_MODEL_NAME: str = "gpt-4.1-mini-2025-04-14" | |
OPENAI_MODEL_TEMPERATURE: float = 0.1 | |
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
# --------------------------------------------------------------------------- # | |
# ----------------------------- SAFE CALCULATOR --------------------------- # | |
# --------------------------------------------------------------------------- # | |
_ALLOWED_OPS = { | |
ast.Add: operator.add, | |
ast.Sub: operator.sub, | |
ast.Mult: operator.mul, | |
ast.Div: operator.truediv, | |
ast.Pow: operator.pow, | |
ast.USub: operator.neg, | |
} | |
def _safe_eval(node: ast.AST) -> int | float | complex: | |
if isinstance(node, ast.Constant): # literal number | |
return node.n | |
if isinstance(node, ast.UnaryOp) and type(node.op) in _ALLOWED_OPS: | |
return _ALLOWED_OPS[type(node.op)](_safe_eval(node.operand)) | |
if isinstance(node, ast.BinOp) and type(node.op) in _ALLOWED_OPS: | |
return _ALLOWED_OPS[type(node.op)]( | |
_safe_eval(node.left), _safe_eval(node.right) | |
) | |
raise ValueError("Unsafe or unsupported expression") | |
def calculator(expression: str) -> str: | |
"""Calculate mathematical expressions safely.""" | |
try: | |
tree = ast.parse(expression, mode="eval") | |
return str(_safe_eval(tree.body)) | |
except Exception as exc: | |
return f"calc_error:{exc}" | |
# --------------------------------------------------------------------------- # | |
# ----------------------------- WEB SEARCH --------------------------- # | |
# --------------------------------------------------------------------------- # | |
def _search_duckduckgo(query: str, k: int = 5) -> list[dict[str, str]]: | |
"""Returns the top-k DuckDuckGo results as a list of {title, snippet, link}. Caches identical queries.""" | |
try: | |
wrapper = DuckDuckGoSearchAPIWrapper(max_results=k) | |
raw = wrapper.results(query, max_results=k) | |
cleaned = [] | |
for hit in raw[:k]: | |
cleaned.append( | |
{ | |
"title": hit.get("title", "")[:120], | |
"snippet": hit.get("snippet", "")[:200], | |
"link": hit.get("link", "")[:200], | |
} | |
) | |
return cleaned | |
except Exception as e: | |
print(f"Search error: {e}") | |
return [] | |
def web_multi_search(query: str, k: int = 5) -> str: | |
""" | |
Multi-backend search. JSON list of {title,snippet,link}. | |
Order: DuckDuckGo → Wikipedia → Google Lite JSON. | |
""" | |
try: | |
hits = _search_duckduckgo(query, k) | |
if hits: | |
return json.dumps(hits, ensure_ascii=False) | |
except Exception: | |
pass | |
# Fallback 2: Wikipedia single-article summary | |
try: | |
page = wiki_summary(query, sentences=2, auto_suggest=False) | |
return json.dumps([{"title": "Wikipedia", "snippet": page, "link": ""}]) | |
except Exception: | |
pass | |
# Fallback 3: simple Google (no key) – tiny quota but better than nothing | |
try: | |
url = "https://r.jina.ai/http://api.allorigins.win/raw?url=" + parse.quote( | |
"https://lite.duckduckgo.com/lite/?q=" + query | |
) | |
txt = requests.get(url, timeout=10).text[:600] | |
return json.dumps( | |
[{"title": "Google-lite", "snippet": re.sub(r"<.*?>", "", txt), "link": ""}] | |
) | |
except Exception as e: | |
return f"search_error:{e}" | |
def youtube_transcript(url: str, num_first_chars: int = 10_000) -> str: | |
"""Returns the YouTube transcript (first `num_first_chars` characters only).""" | |
video_id = re.search(r"v=([A-Za-z0-9_\-]{11})", url) | |
if not video_id: | |
return "yt_error: id" | |
try: | |
ytt_api = YouTubeTranscriptApi() | |
fetched_transcript = ytt_api.fetch(video_id=video_id.group(1)).to_raw_data() | |
transcript_str = " ".join([x["text"] for x in fetched_transcript]) | |
return transcript_str[:num_first_chars] | |
except Exception as e: | |
return f"yt_error: {e}" | |
# --------------------------------------------------------------------------- # | |
# HELPER FUNCTIONS # | |
# --------------------------------------------------------------------------- # | |
def _needs_calc(q: str) -> bool: | |
"""Check if question is purely mathematical.""" | |
math_expr = re.compile(r"^\s*[\d\.\s\+\-\*/\(\)]+?\s*$") | |
return bool(math_expr.match(q)) | |
def _extract_search_terms(question: str) -> str: | |
key = re.findall(r"[A-Za-z0-9']+", question.lower()) | |
phrase = " ".join(key) | |
# if we lost critical tokens (length diff > 40 %), fallback to full q | |
return phrase if len(phrase) > 0.6 * len(question) else question | |
def _summarize_results(results_json: str, max_hits: int = 3) -> str: | |
"""Turn JSON list of hits into a compact text context for the LLM.""" | |
if not results_json or not results_json.lstrip().startswith("["): | |
# Not JSON or empty → return raw text | |
return results_json | |
try: | |
hits = json.loads(results_json)[:max_hits] | |
context_parts = [] | |
for i, h in enumerate(hits, 1): | |
title = h.get("title", "") | |
snippet = h.get("snippet", "") | |
if title or snippet: | |
context_parts.append(f"{i}. {title}: {snippet}") | |
return "\n".join(context_parts) | |
except Exception as e: | |
print(f"Error summarizing results: {e}") | |
return "" | |
# --------------------------------------------------------------------------- # | |
# ------------------------------- AGENT STATE ----------------------------- # | |
# --------------------------------------------------------------------------- # | |
class AgentState(TypedDict): | |
task_id: str | |
question: str | |
answer: str | |
search_results: str | |
context: str | |
reasoning_steps: list[str] | |
tools_used: list[str] | |
# --------------------------------------------------------------------------- # | |
# ------------------------------- GAIA AGENT ------------------------------ # | |
# --------------------------------------------------------------------------- # | |
class GAIAAgent: | |
"""LangGraph-powered agent targeting GAIA Level-1 tasks.""" | |
SYSTEM_PROMPT = """You are an expert research assistant that provides accurate, concise answers. | |
IMPORTANT INSTRUCTIONS: | |
1. Answer with ONLY the specific information requested - no extra explanation | |
2. For numerical answers, provide just the number | |
3. For names, provide just the name(s) | |
4. For yes/no questions, answer "Yes" or "No" | |
5. Use the provided context carefully to find the exact answer | |
6. Be precise and factual | |
Return ONLY the final answer.""" | |
def __init__(self) -> None: | |
try: | |
self.llm = ChatOpenAI( | |
model=OPENAI_MODEL_NAME, | |
temperature=OPENAI_MODEL_TEMPERATURE, | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
print(f"Model name: '{self.llm.model_name}'") | |
print(f"Model temperature: {self.llm.temperature}") | |
except Exception as e: | |
print(f"Warning: Could not initialize OpenAI model: {e}") | |
self.llm = None | |
# Following is defined for book-keeping purposes | |
self.tools = [web_multi_search, calculator, youtube_transcript] | |
self.graph = self._build_graph() | |
def _build_graph(self) -> StateGraph: | |
"""Build the LangGraph workflow.""" | |
workflow = StateGraph(AgentState) | |
# Add nodes | |
workflow.add_node("analyze_question", self._analyze_question) | |
workflow.add_node("route", self._route) | |
workflow.add_node("process_info", self._process_info) | |
workflow.add_node("generate_answer", self._generate_answer) | |
workflow.add_node("normalize_answer", self._normalize_answer) | |
# Add edges | |
workflow.set_entry_point("analyze_question") | |
workflow.add_edge("analyze_question", "route") | |
workflow.add_edge("route", "process_info") | |
workflow.add_edge("process_info", "generate_answer") | |
workflow.add_edge("generate_answer", "normalize_answer") | |
workflow.add_edge("normalize_answer", END) | |
return workflow.compile() | |
# ------------------ NODE IMPLEMENTATIONS ------------------ # | |
def _analyze_question(self, state: AgentState) -> AgentState: | |
q = state["question"] | |
state["reasoning_steps"] = [f"ANALYZE: {q}"] | |
return state | |
def _route(self, state: AgentState) -> AgentState: | |
question = state["question"] | |
# 1️⃣ Calculator path | |
if _needs_calc(question): | |
# 1) strip all whitespace | |
expr = re.sub(r"\s+", "", question) | |
# 2) remove ANY character that isn’t digit, dot, operator, or parenthesis | |
# (kills “USD”, “kg”, YouTube IDs, etc.) | |
expr = re.sub(r"[^\d\.\+\-\*/\(\)]", "", expr) | |
# 3) guard against empty string after cleaning | |
if expr: | |
result = calculator.invoke({"expression": expr}) | |
state["answer"] = result | |
state["tools_used"].append("calculator") | |
state["reasoning_steps"].append(f"CALCULATE: {expr}") | |
return state | |
# 2️⃣ Attachment (Excel file) | |
if "attached" in question.lower() and "excel" in question.lower(): | |
try: | |
task_id = state.get("task_id") | |
file_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
xls_bytes = requests.get(file_url, timeout=10).content | |
df = pd.read_excel(BytesIO(xls_bytes)) | |
total = df["sales"].sum() | |
state["answer"] = f"{total:.2f}" | |
state["tools_used"].append("excel_sum") | |
state["reasoning_steps"].append("xlsx") | |
return state | |
except Exception as e: | |
state["reasoning_steps"].append(f"xlsx_error:{e}") | |
# 3️⃣ YouTube search path | |
youtube_url = re.search(r"https?://www\.youtube\.com/\S+", question) | |
if youtube_url: | |
transcript = youtube_transcript.invoke({"url": youtube_url.group(0)}) | |
state["context"] = transcript | |
state["tools_used"].append("youtube_transcript") | |
state["reasoning_steps"].append("YouTube") | |
return state | |
# 4️⃣ Web search path | |
query = _extract_search_terms(question) | |
results_json = web_multi_search.invoke({"query": query}) | |
state["search_results"] = results_json | |
state["tools_used"].append("web_multi_search") | |
state["reasoning_steps"].append(f"SEARCH: {query}") | |
state["answer"] = "" | |
return state | |
def _process_info(self, state: AgentState) -> AgentState: | |
if state["context"]: | |
# ✅ If context already populated (e.g. YouTube transcript), keep it. | |
state["reasoning_steps"].append("PROCESS(skip)") | |
return state | |
if state["answer"]: | |
# If calc already produced an answer, just pass through | |
state["context"] = "" | |
return state | |
# Summarize search results for the LLM | |
summary = _summarize_results(state["search_results"]) | |
if not summary: | |
summary = state["search_results"][:4000] # cap to 4k chars | |
state["context"] = summary | |
state["reasoning_steps"].append("PROCESS") | |
return state | |
def _generate_answer(self, state: AgentState) -> AgentState: | |
if state["answer"]: | |
# calculator already filled it | |
print("\nCalculator is used ==> No LLM is invoked.\n") | |
return state | |
prompt = [ | |
SystemMessage(content=self.SYSTEM_PROMPT), | |
HumanMessage( | |
content=( | |
f"Question: {state['question']}\n\n" | |
f"Context:\n{state['context']}\n\n" | |
f"Answer:" | |
) | |
), | |
] | |
response = self.llm.invoke(prompt) | |
print(f">>> Raw response from LLM:\n{response}\n") | |
state["answer"] = response.content.strip() | |
state["reasoning_steps"].append("GENERATE ANSWER (LLM)") | |
return state | |
def _normalize_answer(self, state: AgentState) -> AgentState: | |
raw = state["answer"].strip() | |
# 1️⃣ If there’s a pure number anywhere, keep only that number | |
num = re.search(r"\b\d[\d,\.]*\b", raw) | |
if num and len(raw) > len(num.group(0)): | |
raw = num.group(0) | |
# 2️⃣ Normalize Yes / No | |
# if raw.lower().strip(".") in {"yes", "no"}: | |
# raw = raw.capitalize() | |
# 3️⃣ Remove leading 'User:', 'Answer:', etc. | |
raw = re.sub(r"^(User|Answer|Context):\s*", "", raw, flags=re.I) | |
# 4️⃣ Strip trailing punctuation and double-spaces | |
raw = raw.rstrip(".").strip() | |
if not raw: | |
raw = "No answer found" | |
state["answer"] = raw | |
state["reasoning_steps"].append("NORMALIZED ANSWER") | |
return state | |
def __call__(self, question: str, task_id: str = "") -> str: | |
"""Main agent call method.""" | |
print(100 * "-") | |
print(f"GAIA Agent processing question: '{question}'") | |
try: | |
initial_state: AgentState = { | |
"task_id": task_id, | |
"question": question, | |
"answer": "", | |
"search_results": "", | |
"context": "", | |
"reasoning_steps": [], | |
"tools_used": [], | |
} | |
# Run the graph | |
final_state = self.graph.invoke(initial_state) | |
answer = final_state["answer"] | |
print(f"Agent reasoning: {' ==> '.join(final_state['reasoning_steps'])}") | |
print(f"Agent's context {final_state['context']}") | |
print(f"Tools used: {final_state['tools_used']}") | |
print(f"Final answer: {answer}") | |
return answer | |
except Exception as e: | |
print(f"Error in agent processing: {e}") | |
return f"Error processing question: {str(e)}" | |
def run_and_submit_all( | |
profile: gr.OAuthProfile | None, | |
) -> tuple[str, pd.DataFrame | None]: | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = GAIAAgent() | |
print("GAIA Agent initialized successfully") | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question=question_text, task_id=task_id) | |
answers_payload.append( | |
{"task_id": task_id, "submitted_answer": submitted_answer} | |
) | |
results_log.append( | |
{ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": submitted_answer, | |
} | |
) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append( | |
{ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": f"AGENT ERROR: {e}", | |
} | |
) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload, | |
} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox( | |
label="Run Status / Submission Result", lines=5, interactive=False | |
) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
if __name__ == "__main__": | |
print("\n" + "-" * 30 + " App Starting " + "-" * 30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print( | |
f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main" | |
) | |
else: | |
print( | |
"ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined." | |
) | |
print("-" * (60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) | |